blob: e24c25c5ecf711e917e19104b44bb53bf9f90df0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.component.details.FlowChangeExtensionDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.authorization.AuthorizeControllerServiceReference;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.ComponentAuthorizable;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.ApplicationResource.ReplicationTarget;
import org.apache.nifi.web.api.dto.AllowableValueDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.entity.AllowableValueEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
import org.apache.nifi.web.util.ClientResponseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Implements the NiFiWebConfigurationContext interface to support a context in both standalone and clustered environments.
*/
public class StandardNiFiWebConfigurationContext implements NiFiWebConfigurationContext {
private static final Logger logger = LoggerFactory.getLogger(StandardNiFiWebConfigurationContext.class);
private NiFiProperties properties;
private NiFiServiceFacade serviceFacade;
private ClusterCoordinator clusterCoordinator;
private RequestReplicator requestReplicator;
private ControllerServiceProvider controllerServiceProvider;
private ReportingTaskProvider reportingTaskProvider;
private AuditService auditService;
private Authorizer authorizer;
private VariableRegistry variableRegistry;
private void authorizeFlowAccess(final NiFiUser user) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable flow = lookup.getFlow();
flow.authorize(authorizer, RequestAction.READ, user);
});
}
@Override
public ControllerService getControllerService(final String serviceIdentifier, final String componentId) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
authorizeFlowAccess(user);
return controllerServiceProvider.getControllerServiceForComponent(serviceIdentifier, componentId);
}
@Override
public void saveActions(final NiFiWebRequestContext requestContext, final Collection<ConfigurationAction> configurationActions) {
Objects.requireNonNull(configurationActions, "Actions cannot be null.");
// ensure the path could be
if (requestContext.getExtensionType() == null) {
throw new IllegalArgumentException("The UI extension type must be specified.");
}
Component componentType = null;
switch (requestContext.getExtensionType()) {
case ProcessorConfiguration:
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable authorizable = lookup.getProcessor(requestContext.getId()).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
componentType = Component.Processor;
break;
case ControllerServiceConfiguration:
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable authorizable = lookup.getControllerService(requestContext.getId()).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
componentType = Component.ControllerService;
break;
case ReportingTaskConfiguration:
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable authorizable = lookup.getReportingTask(requestContext.getId()).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
componentType = Component.ReportingTask;
break;
}
if (componentType == null) {
throw new IllegalArgumentException("UI extension type must support Processor, ControllerService, or ReportingTask configuration.");
}
// - when running standalone or cluster ncm - actions from custom UIs are stored locally
// - clustered nodes do not serve custom UIs directly to users so they should never be invoking this method
final Date now = new Date();
final Collection<Action> actions = new HashSet<>(configurationActions.size());
for (final ConfigurationAction configurationAction : configurationActions) {
final FlowChangeExtensionDetails extensionDetails = new FlowChangeExtensionDetails();
extensionDetails.setType(configurationAction.getType());
final FlowChangeConfigureDetails configureDetails = new FlowChangeConfigureDetails();
configureDetails.setName(configurationAction.getName());
configureDetails.setPreviousValue(configurationAction.getPreviousValue());
configureDetails.setValue(configurationAction.getValue());
final FlowChangeAction action = new FlowChangeAction();
action.setTimestamp(now);
action.setSourceId(configurationAction.getId());
action.setSourceName(configurationAction.getName());
action.setSourceType(componentType);
action.setOperation(Operation.Configure);
action.setUserIdentity(getCurrentUserIdentity());
action.setComponentDetails(extensionDetails);
action.setActionDetails(configureDetails);
actions.add(action);
}
if (!actions.isEmpty()) {
try {
// record the operations
auditService.addActions(actions);
} catch (final Throwable t) {
logger.warn("Unable to record actions: " + t.getMessage());
if (logger.isDebugEnabled()) {
logger.warn(StringUtils.EMPTY, t);
}
}
}
}
@Override
public String getCurrentUserIdentity() {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
authorizeFlowAccess(user);
return user.getIdentity();
}
@Override
public ComponentDetails getComponentDetails(final NiFiWebRequestContext requestContext) throws ResourceNotFoundException, ClusterRequestException {
final String id = requestContext.getId();
if (StringUtils.isBlank(id)) {
throw new ResourceNotFoundException(String.format("Configuration request context config did not have a component ID."));
}
// ensure the path could be
if (requestContext.getExtensionType() == null) {
throw new IllegalArgumentException("The UI extension type must be specified.");
}
// get the component facade for interacting directly with that type of object
ComponentFacade componentFacade = null;
switch (requestContext.getExtensionType()) {
case ProcessorConfiguration:
componentFacade = new ProcessorFacade();
break;
case ControllerServiceConfiguration:
componentFacade = new ControllerServiceFacade();
break;
case ReportingTaskConfiguration:
componentFacade = new ReportingTaskFacade();
break;
}
if (componentFacade == null) {
throw new IllegalArgumentException("UI extension type must support Processor, ControllerService, or ReportingTask configuration.");
}
return componentFacade.getComponentDetails(requestContext);
}
@Override
public ComponentDetails updateComponent(final NiFiWebConfigurationRequestContext requestContext, final String annotationData, Map<String, String> properties)
throws ResourceNotFoundException, InvalidRevisionException, ClusterRequestException {
final String id = requestContext.getId();
if (StringUtils.isBlank(id)) {
throw new ResourceNotFoundException(String.format("Configuration request context did not have a component ID."));
}
// ensure the path could be
if (requestContext.getExtensionType() == null) {
throw new IllegalArgumentException("The UI extension type must be specified.");
}
// get the component facade for interacting directly with that type of object
ComponentFacade componentFacade = null;
switch (requestContext.getExtensionType()) {
case ProcessorConfiguration:
componentFacade = new ProcessorFacade();
break;
case ControllerServiceConfiguration:
componentFacade = new ControllerServiceFacade();
break;
case ReportingTaskConfiguration:
componentFacade = new ReportingTaskFacade();
break;
}
if (componentFacade == null) {
throw new IllegalArgumentException("UI extension type must support Processor, ControllerService, or ReportingTask configuration.");
}
// if we're clustered, ensure this node is not disconnected
if (StandardNiFiWebConfigurationContext.this.properties.isClustered() && clusterCoordinator != null && !clusterCoordinator.isConnected()) {
// if we are disconnected, ensure the disconnection is acknowledged
if (!Boolean.TRUE.equals(requestContext.isDisconnectionAcknowledged())) {
throw new IllegalArgumentException("This node is disconnected from its configured cluster. The requested change "
+ "will only be allowed if the flag to acknowledge the disconnected node is set.");
}
}
return componentFacade.updateComponent(requestContext, annotationData, properties);
}
private ReplicationTarget getReplicationTarget() {
return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR;
}
private NodeResponse replicate(final String method, final URI uri, final Object entity, final Map<String, String> headers) throws InterruptedException {
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
// to the cluster nodes themselves.
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
return requestReplicator.replicate(method, uri, entity, headers).awaitMergedResponse();
} else {
final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode();
if (coordinatorNode == null) {
throw new NoClusterCoordinatorException();
}
return requestReplicator.forwardToCoordinator(coordinatorNode, method, uri, entity, headers).awaitMergedResponse();
}
}
/**
* Facade over accessing different types of NiFi components.
*/
private interface ComponentFacade {
/**
* Gets the component details using the specified request context.
*
* @param requestContext context
* @return the component details using the specified request context
*/
ComponentDetails getComponentDetails(NiFiWebRequestContext requestContext);
/**
* Sets the annotation data using the specified request context.
*
* @param requestContext context
* @param annotationData data
* @param properties properties
* @return details
*/
ComponentDetails updateComponent(NiFiWebConfigurationRequestContext requestContext, String annotationData, Map<String, String> properties);
}
/**
* Interprets the request/response with the underlying Processor model.
*/
private class ProcessorFacade implements ComponentFacade {
@Override
public ComponentDetails getComponentDetails(final NiFiWebRequestContext requestContext) {
final String id = requestContext.getId();
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable authorizable = lookup.getProcessor(id).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
});
final ProcessorDTO processor;
if (properties.isClustered() && clusterCoordinator != null && clusterCoordinator.isConnected()) {
// create the request URL
URI requestUrl;
try {
final String path = "/nifi-api/processors/" + URLEncoder.encode(id, "UTF-8");
requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null);
} catch (final URISyntaxException | UnsupportedEncodingException use) {
throw new ClusterRequestException(use);
}
// set the request parameters
final MultivaluedMap<String, String> parameters = new MultivaluedHashMap();
// replicate request
NodeResponse nodeResponse;
try {
nodeResponse = replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext));
} catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
}
// check for issues replicating request
checkResponse(nodeResponse, id);
// return processor
ProcessorEntity entity = (ProcessorEntity) nodeResponse.getUpdatedEntity();
if (entity == null) {
entity = nodeResponse.getClientResponse().readEntity(ProcessorEntity.class);
}
processor = entity.getComponent();
} else {
processor = serviceFacade.getProcessor(id).getComponent();
}
// return the processor info
return getComponentConfiguration(processor);
}
@Override
public ComponentDetails updateComponent(final NiFiWebConfigurationRequestContext requestContext, final String annotationData, Map<String, String> properties) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final Revision revision = requestContext.getRevision();
final String id = requestContext.getId();
// authorize access
serviceFacade.authorizeAccess(lookup -> {
// authorize the processor
final ComponentAuthorizable authorizable = lookup.getProcessor(id);
authorizable.getAuthorizable().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
// authorize any referenced service
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(properties, authorizable, authorizer, lookup);
});
final ProcessorDTO processor;
if (StandardNiFiWebConfigurationContext.this.properties.isClustered() && clusterCoordinator != null && clusterCoordinator.isConnected()) {
// create the request URL
URI requestUrl;
try {
final String path = "/nifi-api/processors/" + URLEncoder.encode(id, "UTF-8");
requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null);
} catch (final URISyntaxException | UnsupportedEncodingException use) {
throw new ClusterRequestException(use);
}
// create the revision
final RevisionDTO revisionDto = new RevisionDTO();
revisionDto.setClientId(revision.getClientId());
revisionDto.setVersion(revision.getVersion());
// create the processor entity
final ProcessorEntity processorEntity = new ProcessorEntity();
processorEntity.setRevision(revisionDto);
// create the processor dto
ProcessorDTO processorDto = buildProcessorDto(id,annotationData,properties);
processorEntity.setComponent(processorDto);
// set the content type to json
final Map<String, String> headers = getHeaders(requestContext);
headers.put("Content-Type", "application/json");
// replicate request
NodeResponse nodeResponse;
try {
nodeResponse = replicate(HttpMethod.PUT, requestUrl, processorEntity, headers);
} catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
}
// check for issues replicating request
checkResponse(nodeResponse, id);
// return processor
ProcessorEntity entity = (ProcessorEntity) nodeResponse.getUpdatedEntity();
if (entity == null) {
entity = nodeResponse.getClientResponse().readEntity(ProcessorEntity.class);
}
processor = entity.getComponent();
} else {
// update processor within write lock
ProcessorDTO processorDTO = buildProcessorDto(id, annotationData, properties);
final ProcessorEntity entity = serviceFacade.updateProcessor(revision, processorDTO);
processor = entity.getComponent();
}
// return the processor info
return getComponentConfiguration(processor);
}
private ProcessorDTO buildProcessorDto(String id, final String annotationData, Map<String, String> properties){
ProcessorDTO processorDto = new ProcessorDTO();
processorDto.setId(id);
ProcessorConfigDTO configDto = new ProcessorConfigDTO();
processorDto.setConfig(configDto);
configDto.setAnnotationData(annotationData);
configDto.setProperties(properties);
return processorDto;
}
private ComponentDetails getComponentConfiguration(final ProcessorDTO processor) {
final ProcessorConfigDTO processorConfig = processor.getConfig();
return new ComponentDetails.Builder()
.id(processor.getId())
.name(processor.getName())
.type(processor.getType())
.state(processor.getState())
.annotationData(processorConfig.getAnnotationData())
.properties(processorConfig.getProperties())
.descriptors(buildComponentDescriptorMap(processorConfig))
.validateErrors(processor.getValidationErrors()).build();
}
private Map<String,ComponentDescriptor> buildComponentDescriptorMap(final ProcessorConfigDTO processorConfig){
final Map<String, ComponentDescriptor> descriptors = new HashMap<>();
for(String key : processorConfig.getDescriptors().keySet()){
PropertyDescriptorDTO descriptor = processorConfig.getDescriptors().get(key);
List<AllowableValueEntity> allowableValuesEntity = descriptor.getAllowableValues();
Map<String,String> allowableValues = new HashMap<>();
if(allowableValuesEntity != null) {
for (AllowableValueEntity allowableValueEntity : allowableValuesEntity) {
final AllowableValueDTO allowableValueDTO = allowableValueEntity.getAllowableValue();
allowableValues.put(allowableValueDTO.getValue(), allowableValueDTO.getDisplayName());
}
}
ComponentDescriptor componentDescriptor = new ComponentDescriptor.Builder()
.name(descriptor.getName())
.displayName(descriptor.getDisplayName())
.defaultValue(descriptor.getDefaultValue())
.allowableValues(allowableValues)
.build();
descriptors.put(key,componentDescriptor);
}
return descriptors;
}
}
/**
* Interprets the request/response with the underlying ControllerService model.
*/
private class ControllerServiceFacade implements ComponentFacade {
@Override
public ComponentDetails getComponentDetails(final NiFiWebRequestContext requestContext) {
final String id = requestContext.getId();
final ControllerServiceDTO controllerService;
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable authorizable = lookup.getControllerService(id).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
});
if (properties.isClustered() && clusterCoordinator != null && clusterCoordinator.isConnected()) {
// create the request URL
URI requestUrl;
try {
String path = "/nifi-api/controller-services/" + URLEncoder.encode(id, "UTF-8");
requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null);
} catch (final URISyntaxException | UnsupportedEncodingException use) {
throw new ClusterRequestException(use);
}
// set the request parameters
final MultivaluedMap<String, String> parameters = new MultivaluedHashMap();
// replicate request
NodeResponse nodeResponse;
try {
nodeResponse = replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext));
} catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
}
// check for issues replicating request
checkResponse(nodeResponse, id);
// return controller service
ControllerServiceEntity entity = (ControllerServiceEntity) nodeResponse.getUpdatedEntity();
if (entity == null) {
entity = nodeResponse.getClientResponse().readEntity(ControllerServiceEntity.class);
}
controllerService = entity.getComponent();
} else {
controllerService = serviceFacade.getControllerService(id).getComponent();
}
// return the controller service info
return getComponentConfiguration(controllerService);
}
@Override
public ComponentDetails updateComponent(final NiFiWebConfigurationRequestContext requestContext, final String annotationData, Map<String, String> properties) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final Revision revision = requestContext.getRevision();
final String id = requestContext.getId();
// authorize access
serviceFacade.authorizeAccess(lookup -> {
// authorize the controller service
final ComponentAuthorizable authorizable = lookup.getControllerService(id);
authorizable.getAuthorizable().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
// authorize any referenced service
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(properties, authorizable, authorizer, lookup);
});
final ControllerServiceDTO controllerService;
if (StandardNiFiWebConfigurationContext.this.properties.isClustered() && clusterCoordinator != null && clusterCoordinator.isConnected()) {
// create the request URL
URI requestUrl;
try {
String path = "/nifi-api/controller-services/" + URLEncoder.encode(id, "UTF-8");
requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null);
} catch (final URISyntaxException | UnsupportedEncodingException use) {
throw new ClusterRequestException(use);
}
// create the revision
final RevisionDTO revisionDto = new RevisionDTO();
revisionDto.setClientId(revision.getClientId());
revisionDto.setVersion(revision.getVersion());
// create the controller service entity
final ControllerServiceEntity controllerServiceEntity = new ControllerServiceEntity();
controllerServiceEntity.setRevision(revisionDto);
// create the controller service dto
final ControllerServiceDTO controllerServiceDto = new ControllerServiceDTO();
controllerServiceEntity.setComponent(controllerServiceDto);
controllerServiceDto.setId(id);
controllerServiceDto.setAnnotationData(annotationData);
controllerServiceDto.setProperties(properties);
// set the content type to json
final Map<String, String> headers = getHeaders(requestContext);
headers.put("Content-Type", "application/json");
// replicate request
NodeResponse nodeResponse;
try {
nodeResponse = replicate(HttpMethod.PUT, requestUrl, controllerServiceEntity, headers);
} catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
}
// check for issues replicating request
checkResponse(nodeResponse, id);
// return controller service
ControllerServiceEntity entity = (ControllerServiceEntity) nodeResponse.getUpdatedEntity();
if (entity == null) {
entity = nodeResponse.getClientResponse().readEntity(ControllerServiceEntity.class);
}
controllerService = entity.getComponent();
} else {
final ControllerServiceDTO controllerServiceDto = new ControllerServiceDTO();
controllerServiceDto.setId(id);
controllerServiceDto.setAnnotationData(annotationData);
controllerServiceDto.setProperties(properties);
// update controller service
final ControllerServiceEntity entity = serviceFacade.updateControllerService(revision, controllerServiceDto);
controllerService = entity.getComponent();
}
// return the controller service info
return getComponentConfiguration(controllerService);
}
private ComponentDetails getComponentConfiguration(final ControllerServiceDTO controllerService) {
return new ComponentDetails.Builder()
.id(controllerService.getId())
.name(controllerService.getName())
.type(controllerService.getType())
.state(controllerService.getState())
.annotationData(controllerService.getAnnotationData())
.properties(controllerService.getProperties())
.validateErrors(controllerService.getValidationErrors()).build();
}
}
/**
* Interprets the request/response with the underlying ControllerService model.
*/
private class ReportingTaskFacade implements ComponentFacade {
@Override
public ComponentDetails getComponentDetails(final NiFiWebRequestContext requestContext) {
final String id = requestContext.getId();
final ReportingTaskDTO reportingTask;
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable authorizable = lookup.getReportingTask(id).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
});
if (properties.isClustered() && clusterCoordinator != null && clusterCoordinator.isConnected()) {
// create the request URL
URI requestUrl;
try {
String path = "/nifi-api/reporting-tasks/" + URLEncoder.encode(id, "UTF-8");
requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null);
} catch (final URISyntaxException | UnsupportedEncodingException use) {
throw new ClusterRequestException(use);
}
// set the request parameters
final MultivaluedMap<String, String> parameters = new MultivaluedHashMap();
// replicate request
NodeResponse nodeResponse;
try {
nodeResponse = replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext));
} catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
}
// check for issues replicating request
checkResponse(nodeResponse, id);
// return reporting task
ReportingTaskEntity entity = (ReportingTaskEntity) nodeResponse.getUpdatedEntity();
if (entity == null) {
entity = nodeResponse.getClientResponse().readEntity(ReportingTaskEntity.class);
}
reportingTask = entity.getComponent();
} else {
reportingTask = serviceFacade.getReportingTask(id).getComponent();
}
// return the reporting task info
return getComponentConfiguration(reportingTask);
}
@Override
public ComponentDetails updateComponent(final NiFiWebConfigurationRequestContext requestContext, final String annotationData, Map<String, String> properties) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final Revision revision = requestContext.getRevision();
final String id = requestContext.getId();
// authorize access
serviceFacade.authorizeAccess(lookup -> {
// authorize the reporting task
final ComponentAuthorizable authorizable = lookup.getReportingTask(id);
authorizable.getAuthorizable().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
// authorize any referenced service
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(properties, authorizable, authorizer, lookup);
});
final ReportingTaskDTO reportingTask;
if (StandardNiFiWebConfigurationContext.this.properties.isClustered() && clusterCoordinator != null && clusterCoordinator.isConnected()) {
// create the request URL
URI requestUrl;
try {
String path = "/nifi-api/reporting-tasks/" + URLEncoder.encode(id, "UTF-8");
requestUrl = new URI(requestContext.getScheme(), null, "localhost", 0, path, null, null);
} catch (final URISyntaxException | UnsupportedEncodingException use) {
throw new ClusterRequestException(use);
}
// create the revision
final RevisionDTO revisionDto = new RevisionDTO();
revisionDto.setClientId(revision.getClientId());
revisionDto.setVersion(revision.getVersion());
// create the reporting task entity
final ReportingTaskEntity reportingTaskEntity = new ReportingTaskEntity();
reportingTaskEntity.setRevision(revisionDto);
// create the reporting task dto
final ReportingTaskDTO reportingTaskDto = new ReportingTaskDTO();
reportingTaskEntity.setComponent(reportingTaskDto);
reportingTaskDto.setId(id);
reportingTaskDto.setAnnotationData(annotationData);
reportingTaskDto.setProperties(properties);
// set the content type to json
final Map<String, String> headers = getHeaders(requestContext);
headers.put("Content-Type", "application/json");
// replicate request
NodeResponse nodeResponse;
try {
nodeResponse = replicate(HttpMethod.PUT, requestUrl, reportingTaskEntity, headers);
} catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
}
// check for issues replicating request
checkResponse(nodeResponse, id);
// return reporting task
ReportingTaskEntity entity = (ReportingTaskEntity) nodeResponse.getUpdatedEntity();
if (entity == null) {
entity = nodeResponse.getClientResponse().readEntity(ReportingTaskEntity.class);
}
reportingTask = entity.getComponent();
} else {
final ReportingTaskDTO reportingTaskDto = new ReportingTaskDTO();
reportingTaskDto.setId(id);
reportingTaskDto.setAnnotationData(annotationData);
reportingTaskDto.setProperties(properties);
// obtain write lock
serviceFacade.verifyRevision(revision, user);
final ReportingTaskEntity entity = serviceFacade.updateReportingTask(revision, reportingTaskDto);
reportingTask = entity.getComponent();
}
// return the processor info
return getComponentConfiguration(reportingTask);
}
private ComponentDetails getComponentConfiguration(final ReportingTaskDTO reportingTask) {
return new ComponentDetails.Builder()
.id(reportingTask.getId())
.name(reportingTask.getName())
.type(reportingTask.getType())
.state(reportingTask.getState())
.annotationData(reportingTask.getAnnotationData())
.properties(reportingTask.getProperties())
.validateErrors(reportingTask.getValidationErrors()).build();
}
}
/**
* Gets the headers for the request to replicate to each node while clustered.
*/
private Map<String, String> getHeaders(final NiFiWebRequestContext config) {
final Map<String, String> headers = new HashMap<>();
headers.put("Accept", "application/json,application/xml");
return headers;
}
/**
* Checks the specified response and drains the stream appropriately.
*/
private void checkResponse(final NodeResponse nodeResponse, final String id) {
if (nodeResponse.hasThrowable()) {
ClientResponseUtils.drainClientResponse(nodeResponse.getClientResponse());
throw new ClusterRequestException(nodeResponse.getThrowable());
} else if (nodeResponse.getClientResponse().getStatus() == Response.Status.CONFLICT.getStatusCode()) {
ClientResponseUtils.drainClientResponse(nodeResponse.getClientResponse());
throw new InvalidRevisionException(String.format("NiFi is unable to process the request at this time."));
} else if (nodeResponse.getClientResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
ClientResponseUtils.drainClientResponse(nodeResponse.getClientResponse());
throw new ResourceNotFoundException("Unable to find component with id: " + id);
} else if (nodeResponse.getClientResponse().getStatus() != Response.Status.OK.getStatusCode()) {
ClientResponseUtils.drainClientResponse(nodeResponse.getClientResponse());
throw new ClusterRequestException("Method resulted in an unsuccessful HTTP response code: " + nodeResponse.getClientResponse().getStatus());
}
}
public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) {
this.clusterCoordinator = clusterCoordinator;
}
public void setRequestReplicator(final RequestReplicator requestReplicator) {
this.requestReplicator = requestReplicator;
}
public void setProperties(final NiFiProperties properties) {
this.properties = properties;
}
public void setServiceFacade(final NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
}
public void setAuditService(final AuditService auditService) {
this.auditService = auditService;
}
public void setControllerServiceProvider(final ControllerServiceProvider controllerServiceProvider) {
this.controllerServiceProvider = controllerServiceProvider;
}
public void setReportingTaskProvider(final ReportingTaskProvider reportingTaskProvider) {
this.reportingTaskProvider = reportingTaskProvider;
}
public void setAuthorizer(final Authorizer authorizer) {
this.authorizer = authorizer;
}
public void setVariableRegistry(final VariableRegistry variableRegistry){
this.variableRegistry = variableRegistry;
}
}