NIFI-40: merged changes from NIFI-250 branch and develop
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java
index 6a9c7fc..b9e7ff1 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java
@@ -29,18 +29,21 @@
private String name;
private String comment;
private String type;
- private String schedulingPeriod;
private String state;
- private String schedulingStrategy;
private String availability;
+ private String schedulingPeriod;
+ private String schedulingStrategy;
+ private Map<String, String> defaultSchedulingPeriod;
+
private Map<String, String> properties;
private Map<String, PropertyDescriptorDTO> descriptors;
private String annotationData;
private Collection<String> validationErrors;
-
+ private Integer activeThreadCount;
+
/**
* The user-defined name of the reporting task
* @return
@@ -180,4 +183,31 @@
public void setValidationErrors(Collection<String> validationErrors) {
this.validationErrors = validationErrors;
}
+
+ /**
+ * The default scheduling period for the different scheduling strategies.
+ *
+ * @return
+ */
+ public Map<String, String> getDefaultSchedulingPeriod() {
+ return defaultSchedulingPeriod;
+ }
+
+ public void setDefaultSchedulingPeriod(Map<String, String> defaultSchedulingPeriod) {
+ this.defaultSchedulingPeriod = defaultSchedulingPeriod;
+ }
+
+ /**
+ * The number of active threads for this reporting task.
+ *
+ * @return
+ */
+ public Integer getActiveThreadCount() {
+ return activeThreadCount;
+ }
+
+ public void setActiveThreadCount(Integer activeThreadCount) {
+ this.activeThreadCount = activeThreadCount;
+ }
+
}
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 2f17a35..8c266be 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -219,6 +219,15 @@
import org.xml.sax.SAXParseException;
import com.sun.jersey.api.client.ClientResponse;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
+import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import org.apache.nifi.web.api.entity.ReportingTasksEntity;
/**
* Provides a cluster manager implementation. The manager federates incoming
@@ -306,7 +315,13 @@
public static final String PROVENANCE_URI = "/nifi-api/controller/provenance";
public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
-
+
+ public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
+ public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
+ public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references");
+ public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
+ public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
+
private final NiFiProperties properties;
private final HttpRequestReplicator httpRequestReplicator;
private final HttpResponseMapper httpResponseMapper;
@@ -1410,6 +1425,15 @@
controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode);
}
+ @Override
+ public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanDisableReferencingServices(serviceNode);
+ }
+
+ @Override
+ public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanStopReferencingComponents(serviceNode);
+ }
private byte[] serialize(final Document doc) throws TransformerException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -2362,13 +2386,51 @@
private static boolean isProvenanceEventEndpoint(final URI uri, final String method) {
return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches();
}
+
+ private static boolean isControllerServicesEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath());
+ }
+
+ private static boolean isControllerServiceEndpoint(final URI uri, final String method) {
+ if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) {
+ return true;
+ } else if ("POST".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath())) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private static boolean isControllerServiceReferenceEndpoint(final URI uri, final String method) {
+ if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private static boolean isReportingTasksEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath());
+ }
+
+ private static boolean isReportingTaskEndpoint(final URI uri, final String method) {
+ if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REPORTING_TASK_URI_PATTERN.matcher(uri.getPath()).matches()) {
+ return true;
+ } else if ("POST".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath())) {
+ return true;
+ }
+
+ return false;
+ }
static boolean isResponseInterpreted(final URI uri, final String method) {
return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method)
|| isRemoteProcessGroupsEndpoint(uri, method) || isRemoteProcessGroupEndpoint(uri, method)
|| isProcessGroupEndpoint(uri, method)
|| isTemplateEndpoint(uri, method) || isFlowSnippetEndpoint(uri, method)
- || isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method);
+ || isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method)
+ || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method)
+ || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method);
}
private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) {
@@ -2378,37 +2440,12 @@
final NodeIdentifier nodeId = nodeEntry.getKey();
final ProcessorDTO nodeProcessor = nodeEntry.getValue();
- // get the processor's validation errors and put them into a map
- // where the key is the validation error and the value is the set of all
- // nodes that reported that validation error.
- final Collection<String> nodeValidationErrors = nodeProcessor.getValidationErrors();
- if (nodeValidationErrors != null) {
- for (final String nodeValidationError : nodeValidationErrors) {
- Set<NodeIdentifier> nodeSet = validationErrorMap.get(nodeValidationError);
- if (nodeSet == null) {
- nodeSet = new HashSet<>();
- validationErrorMap.put(nodeValidationError, nodeSet);
- }
- nodeSet.add(nodeId);
- }
- }
+ // merge the validation errors
+ mergeValidationErrors(validationErrorMap, nodeId, nodeProcessor.getValidationErrors());
}
- final Set<String> normalizedValidationErrors = new HashSet<>();
- for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : validationErrorMap.entrySet()) {
- final String msg = validationEntry.getKey();
- final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
-
- if (nodeIds.size() == processorMap.size()) {
- normalizedValidationErrors.add(msg);
- } else {
- for (final NodeIdentifier nodeId : nodeIds) {
- normalizedValidationErrors.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg);
- }
- }
- }
-
- processor.setValidationErrors(normalizedValidationErrors);
+ // set the merged the validation errors
+ processor.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, processorMap.size()));
}
private void mergeProvenanceQueryResults(final ProvenanceDTO provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final Set<NodeResponse> problematicResponses) {
@@ -2576,7 +2613,156 @@
remoteProcessGroup.setAuthorizationIssues(mergedAuthorizationIssues);
}
}
+
+ private void mergeControllerServiceReferences(final Set<ControllerServiceReferencingComponentDTO> referencingComponents, final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> referencingComponentMap) {
+ final Map<String, Integer> activeThreadCounts = new HashMap<>();
+ final Map<String, String> states = new HashMap<>();
+ for (final Map.Entry<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeEntry : referencingComponentMap.entrySet()) {
+ final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeEntry.getValue();
+ // go through all the nodes referencing components
+ for (final ControllerServiceReferencingComponentDTO nodeReferencingComponent : nodeReferencingComponents) {
+ // handle active thread counts
+ if (nodeReferencingComponent.getActiveThreadCount() != null && nodeReferencingComponent.getActiveThreadCount() > 0) {
+ final Integer current = activeThreadCounts.get(nodeReferencingComponent.getId());
+ if (current == null) {
+ activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount());
+ } else {
+ activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount() + current);
+ }
+ }
+
+ // handle controller service state
+ final String state = states.get(nodeReferencingComponent.getId());
+ if (state == null) {
+ if (ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState())) {
+ states.put(nodeReferencingComponent.getId(), ControllerServiceState.DISABLING.name());
+ } else if (ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState())) {
+ states.put(nodeReferencingComponent.getId(), ControllerServiceState.ENABLING.name());
+ }
+ }
+ }
+ }
+
+ // go through each referencing components
+ for (final ControllerServiceReferencingComponentDTO referencingComponent : referencingComponents) {
+ final Integer activeThreadCount = activeThreadCounts.get(referencingComponent.getId());
+ if (activeThreadCount != null) {
+ referencingComponent.setActiveThreadCount(activeThreadCount);
+ }
+
+ final String state = states.get(referencingComponent.getId());
+ if (state != null) {
+ referencingComponent.setState(state);
+ }
+ }
+ }
+
+ private void mergeControllerService(final ControllerServiceDTO controllerService, final Map<NodeIdentifier, ControllerServiceDTO> controllerServiceMap) {
+ final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
+ final Set<ControllerServiceReferencingComponentDTO> referencingComponents = controllerService.getReferencingComponents();
+ final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeReferencingComponentsMap = new HashMap<>();
+
+ String state = null;
+ for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : controllerServiceMap.entrySet()) {
+ final NodeIdentifier nodeId = nodeEntry.getKey();
+ final ControllerServiceDTO nodeControllerService = nodeEntry.getValue();
+
+ if (state == null) {
+ if (ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState())) {
+ state = ControllerServiceState.DISABLING.name();
+ } else if (ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState())) {
+ state = ControllerServiceState.ENABLING.name();
+ }
+ }
+
+ for (final ControllerServiceReferencingComponentDTO nodeReferencingComponents : nodeControllerService.getReferencingComponents()) {
+ nodeReferencingComponentsMap.put(nodeId, nodeReferencingComponents.getReferencingComponents());
+ }
+
+ // merge the validation errors
+ mergeValidationErrors(validationErrorMap, nodeId, nodeControllerService.getValidationErrors());
+ }
+
+ // merge the referencing components
+ mergeControllerServiceReferences(referencingComponents, nodeReferencingComponentsMap);
+
+ // store the 'transition' state is applicable
+ if (state != null) {
+ controllerService.setState(state);
+ }
+
+ // set the merged the validation errors
+ controllerService.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, controllerServiceMap.size()));
+ }
+
+ private void mergeReportingTask(final ReportingTaskDTO reportingTask, final Map<NodeIdentifier, ReportingTaskDTO> reportingTaskMap) {
+ final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
+
+ int activeThreadCount = 0;
+ for (final Map.Entry<NodeIdentifier, ReportingTaskDTO> nodeEntry : reportingTaskMap.entrySet()) {
+ final NodeIdentifier nodeId = nodeEntry.getKey();
+ final ReportingTaskDTO nodeReportingTask = nodeEntry.getValue();
+
+ if (nodeReportingTask.getActiveThreadCount() != null) {
+ activeThreadCount += nodeReportingTask.getActiveThreadCount();
+ }
+
+ // merge the validation errors
+ mergeValidationErrors(validationErrorMap, nodeId, nodeReportingTask.getValidationErrors());
+ }
+
+ // set the merged active thread counts
+ reportingTask.setActiveThreadCount(activeThreadCount);
+
+ // set the merged the validation errors
+ reportingTask.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, reportingTaskMap.size()));
+ }
+
+ /**
+ * Merges the validation errors into the specified map, recording the corresponding node identifier.
+ *
+ * @param validationErrorMap
+ * @param nodeId
+ * @param nodeValidationErrors
+ */
+ public void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final Collection<String> nodeValidationErrors) {
+ if (nodeValidationErrors != null) {
+ for (final String nodeValidationError : nodeValidationErrors) {
+ Set<NodeIdentifier> nodeSet = validationErrorMap.get(nodeValidationError);
+ if (nodeSet == null) {
+ nodeSet = new HashSet<>();
+ validationErrorMap.put(nodeValidationError, nodeSet);
+ }
+ nodeSet.add(nodeId);
+ }
+ }
+ }
+
+ /**
+ * Normalizes the validation errors by prepending the corresponding nodes when the error does not exist across all nodes.
+ *
+ * @param validationErrorMap
+ * @param totalNodes
+ * @return
+ */
+ public Set<String> normalizedMergedValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) {
+ final Set<String> normalizedValidationErrors = new HashSet<>();
+ for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : validationErrorMap.entrySet()) {
+ final String msg = validationEntry.getKey();
+ final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
+
+ if (nodeIds.size() == totalNodes) {
+ normalizedValidationErrors.add(msg);
+ } else {
+ for (final NodeIdentifier nodeId : nodeIds) {
+ normalizedValidationErrors.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg);
+ }
+ }
+ }
+ return normalizedValidationErrors;
+ }
+
// requires write lock to be already acquired unless request is not mutable
private NodeResponse mergeResponses(final URI uri, final String method, final Set<NodeResponse> nodeResponses, final boolean mutableRequest) {
// holds the one response of all the node responses to return to the client
@@ -2866,6 +3052,126 @@
event.setClusterNodeAddress(nodeId.getApiAddress() + ":" + nodeId.getApiPort());
clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isControllerServiceEndpoint(uri, method)) {
+ final ControllerServiceEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
+ final ControllerServiceDTO controllerService = responseEntity.getControllerService();
+
+ final Map<NodeIdentifier, ControllerServiceDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ControllerServiceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
+ final ControllerServiceDTO nodeControllerService = nodeResponseEntity.getControllerService();
+
+ resultsMap.put(nodeResponse.getNodeId(), nodeControllerService);
+ }
+ mergeControllerService(controllerService, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isControllerServicesEndpoint(uri, method)) {
+ final ControllerServicesEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
+ final Set<ControllerServiceDTO> controllerServices = responseEntity.getControllerServices();
+
+ final Map<String, Map<NodeIdentifier, ControllerServiceDTO>> controllerServiceMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ControllerServicesEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
+ final Set<ControllerServiceDTO> nodeControllerServices = nodeResponseEntity.getControllerServices();
+
+ for (final ControllerServiceDTO nodeControllerService : nodeControllerServices) {
+ Map<NodeIdentifier, ControllerServiceDTO> innerMap = controllerServiceMap.get(nodeControllerService.getId());
+ if (innerMap == null) {
+ innerMap = new HashMap<>();
+ controllerServiceMap.put(nodeControllerService.getId(), innerMap);
+ }
+
+ innerMap.put(nodeResponse.getNodeId(), nodeControllerService);
+ }
+ }
+
+ for (final ControllerServiceDTO controllerService : controllerServices) {
+ final String procId = controllerService.getId();
+ final Map<NodeIdentifier, ControllerServiceDTO> mergeMap = controllerServiceMap.get(procId);
+
+ mergeControllerService(controllerService, mergeMap);
+ }
+
+ // create a new client response
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isControllerServiceReferenceEndpoint(uri, method)) {
+ final ControllerServiceReferencingComponentsEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+ final Set<ControllerServiceReferencingComponentDTO> referencingComponents = responseEntity.getControllerServiceReferencingComponents();
+
+ final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ControllerServiceReferencingComponentsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+ final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents();
+
+ resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents);
+ }
+ mergeControllerServiceReferences(referencingComponents, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isReportingTaskEndpoint(uri, method)) {
+ final ReportingTaskEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
+ final ReportingTaskDTO reportingTask = responseEntity.getReportingTask();
+
+ final Map<NodeIdentifier, ReportingTaskDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ReportingTaskEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
+ final ReportingTaskDTO nodeReportingTask = nodeResponseEntity.getReportingTask();
+
+ resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask);
+ }
+ mergeReportingTask(reportingTask, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isReportingTasksEndpoint(uri, method)) {
+ final ReportingTasksEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+ final Set<ReportingTaskDTO> reportingTaskSet = responseEntity.getReportingTasks();
+
+ final Map<String, Map<NodeIdentifier, ReportingTaskDTO>> reportingTaskMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ReportingTasksEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+ final Set<ReportingTaskDTO> nodeReportingTasks = nodeResponseEntity.getReportingTasks();
+
+ for (final ReportingTaskDTO nodeReportingTask : nodeReportingTasks) {
+ Map<NodeIdentifier, ReportingTaskDTO> innerMap = reportingTaskMap.get(nodeReportingTask.getId());
+ if (innerMap == null) {
+ innerMap = new HashMap<>();
+ reportingTaskMap.put(nodeReportingTask.getId(), innerMap);
+ }
+
+ innerMap.put(nodeResponse.getNodeId(), nodeReportingTask);
+ }
+ }
+
+ for (final ReportingTaskDTO reportingTask : reportingTaskSet) {
+ final String procId = reportingTask.getId();
+ final Map<NodeIdentifier, ReportingTaskDTO> mergeMap = reportingTaskMap.get(procId);
+
+ mergeReportingTask(reportingTask, mergeMap);
+ }
+
+ // create a new client response
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
} else {
if (!nodeResponsesToDrain.isEmpty()) {
drainResponses(nodeResponsesToDrain);
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 351a036..aac65dc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -75,6 +75,14 @@
*/
Set<ControllerServiceNode> getAllControllerServices();
+ /**
+ * Verifies that all running Processors and Reporting Tasks referencing the Controller Service (or a service
+ * that depends on the provided service) can be stopped.
+ * @param serviceNode
+ *
+ * @throws IllegalStateException if any referencing component cannot be stopped
+ */
+ void verifyCanStopReferencingComponents(ControllerServiceNode serviceNode);
/**
* Recursively unschedules all schedulable components (Processors and Reporting Tasks) that reference the given
@@ -85,6 +93,14 @@
void unscheduleReferencingComponents(ControllerServiceNode serviceNode);
/**
+ * Verifies that all Controller Services referencing the provided Controller Service can be disabled.
+ * @param serviceNode
+ *
+ * @throws IllegalStateException if any referencing service cannot be disabled
+ */
+ void verifyCanDisableReferencingServices(ControllerServiceNode serviceNode);
+
+ /**
* Disables any Controller Service that references the provided Controller Service. This action is performed recursively
* so that if service A references B and B references C, disabling references for C will first disable A, then B.
* @param serviceNode
@@ -94,15 +110,11 @@
/**
* Verifies that all Controller Services referencing the provided ControllerService can be enabled.
* @param serviceNode
+ *
+ * @throws IllegalStateException if any referencing component cannot be enabled
*/
void verifyCanEnableReferencingServices(ControllerServiceNode serviceNode);
- /**
- * Verifies that all enabled Processors referencing the ControllerService (or a service that depends on
- * the provided service) can be scheduled to run.
- * @param serviceNode
- */
- void verifyCanScheduleReferencingComponents(ControllerServiceNode serviceNode);
/**
* Enables all Controller Services that are referencing the given service. If Service A references Service B and Service
@@ -112,6 +124,15 @@
void enableReferencingServices(ControllerServiceNode serviceNode);
/**
+ * Verifies that all enabled Processors referencing the ControllerService (or a service that depends on
+ * the provided service) can be scheduled to run.
+ * @param serviceNode
+ *
+ * @throws IllegalStateException if any referencing component cannot be scheduled
+ */
+ void verifyCanScheduleReferencingComponents(ControllerServiceNode serviceNode);
+
+ /**
* Schedules any schedulable component (Processor, ReportingTask) that is referencing the given Controller Service
* to run. This is performed recursively, so if a Processor is referencing Service A, which is referencing serviceNode,
* then the Processor will also be started.
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index d1e24c0..b77e94d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2512,15 +2512,15 @@
if ( firstTimeAdded ) {
final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask());
- final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(),
+ final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(),
SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this);
-
+
try {
task.initialize(config);
} catch (final InitializationException ie) {
throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + type, ie);
}
-
+
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
} catch (final Exception e) {
@@ -2652,6 +2652,16 @@
}
@Override
+ public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanDisableReferencingServices(serviceNode);
+ }
+
+ @Override
+ public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanStopReferencingComponents(serviceNode);
+ }
+
+ @Override
public ControllerService getControllerService(final String serviceIdentifier) {
return controllerServiceProvider.getControllerService(serviceIdentifier);
}
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index c1100c8..2348dcb 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -390,7 +390,7 @@
}
final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask());
- final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(),
+ final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(),
SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller);
try {
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 79132b1..cca825c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -458,4 +458,24 @@
}
}
+ @Override
+ public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
+ // Get a list of all Controller Services that need to be disabled, in the order that they need to be
+ // disabled.
+ final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
+
+ for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+ final ControllerServiceState state = nodeToDisable.getState();
+
+ if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+ nodeToDisable.verifyCanDisable(serviceSet);
+ }
+ }
+ }
+
+ @Override
+ public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
+ // we can always stop referencing components
+ }
}
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ControllerServiceAuditor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ControllerServiceAuditor.java
index f6cc131..ea3af14 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ControllerServiceAuditor.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ControllerServiceAuditor.java
@@ -32,12 +32,14 @@
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.web.security.user.NiFiUserUtils;
import org.apache.nifi.user.NiFiUser;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.dao.ControllerServiceDAO;
import org.aspectj.lang.ProceedingJoinPoint;
@@ -219,6 +221,7 @@
* Audits the update of a component referencing a controller service.
*
* @param proceedingJoinPoint
+ * @param controllerServiceId
* @return
* @throws Throwable
*/
@@ -233,49 +236,11 @@
if (user != null) {
final Collection<Action> actions = new ArrayList<>();
+ final Collection<String> visitedServices = new ArrayList<>();
+ visitedServices.add(controllerServiceReference.getReferencedComponent().getIdentifier());
- // consider each component updates
- for (final ConfiguredComponent component : controllerServiceReference.getReferencingComponents()) {
- if (component instanceof ProcessorNode) {
- final ProcessorNode processor = ((ProcessorNode) component);
-
- // create the processor details
- ExtensionDetails processorDetails = new ExtensionDetails();
- processorDetails.setType(processor.getProcessor().getClass().getSimpleName());
-
- // create a processor action
- Action processorAction = new Action();
- processorAction.setUserDn(user.getDn());
- processorAction.setUserName(user.getUserName());
- processorAction.setTimestamp(new Date());
- processorAction.setSourceId(processor.getIdentifier());
- processorAction.setSourceName(processor.getName());
- processorAction.setSourceType(Component.Processor);
- processorAction.setComponentDetails(processorDetails);
- processorAction.setOperation(ScheduledState.RUNNING.equals(processor.getScheduledState()) ? Operation.Start : Operation.Stop);
- actions.add(processorAction);
- } else if (component instanceof ControllerServiceNode) {
- final ControllerServiceNode controllerService = ((ControllerServiceNode) component);
-
- // create the processor details
- ExtensionDetails serviceDetails = new ExtensionDetails();
- serviceDetails.setType(controllerService.getControllerServiceImplementation().getClass().getSimpleName());
-
- // create a controller service action
- Action serviceAction = new Action();
- serviceAction.setUserDn(user.getDn());
- serviceAction.setUserName(user.getUserName());
- serviceAction.setTimestamp(new Date());
- serviceAction.setSourceId(controllerService.getIdentifier());
- serviceAction.setSourceName(controllerService.getName());
- serviceAction.setSourceType(Component.ControllerService);
- serviceAction.setComponentDetails(serviceDetails);
- serviceAction.setOperation(isDisabled(controllerService) ? Operation.Disable : Operation.Enable);
- actions.add(serviceAction);
-
- // need to consider components referencing this controller service (transitive)
- }
- }
+ // get all applicable actions
+ getUpdateActionsForReferencingComponents(user, actions, visitedServices, controllerServiceReference.getReferencingComponents());
// ensure there are actions to record
if (!actions.isEmpty()) {
@@ -288,6 +253,80 @@
}
/**
+ * Gets the update actions for all specified referencing components.
+ *
+ * @param user
+ * @param actions
+ * @param visitedServices
+ * @param referencingComponents
+ */
+ private void getUpdateActionsForReferencingComponents(final NiFiUser user, final Collection<Action> actions, final Collection<String> visitedServices, final Set<ConfiguredComponent> referencingComponents) {
+ // consider each component updates
+ for (final ConfiguredComponent component : referencingComponents) {
+ if (component instanceof ProcessorNode) {
+ final ProcessorNode processor = ((ProcessorNode) component);
+
+ // create the processor details
+ ExtensionDetails processorDetails = new ExtensionDetails();
+ processorDetails.setType(processor.getProcessor().getClass().getSimpleName());
+
+ // create a processor action
+ Action processorAction = new Action();
+ processorAction.setUserDn(user.getDn());
+ processorAction.setUserName(user.getUserName());
+ processorAction.setTimestamp(new Date());
+ processorAction.setSourceId(processor.getIdentifier());
+ processorAction.setSourceName(processor.getName());
+ processorAction.setSourceType(Component.Processor);
+ processorAction.setComponentDetails(processorDetails);
+ processorAction.setOperation(ScheduledState.RUNNING.equals(processor.getScheduledState()) ? Operation.Start : Operation.Stop);
+ actions.add(processorAction);
+ } else if (component instanceof ReportingTask) {
+ final ReportingTaskNode reportingTask = ((ReportingTaskNode) component);
+
+ // create the reporting task details
+ ExtensionDetails processorDetails = new ExtensionDetails();
+ processorDetails.setType(reportingTask.getReportingTask().getClass().getSimpleName());
+
+ // create a reporting task action
+ Action reportingTaskAction = new Action();
+ reportingTaskAction.setUserDn(user.getDn());
+ reportingTaskAction.setUserName(user.getUserName());
+ reportingTaskAction.setTimestamp(new Date());
+ reportingTaskAction.setSourceId(reportingTask.getIdentifier());
+ reportingTaskAction.setSourceName(reportingTask.getName());
+ reportingTaskAction.setSourceType(Component.ReportingTask);
+ reportingTaskAction.setComponentDetails(processorDetails);
+ reportingTaskAction.setOperation(ScheduledState.RUNNING.equals(reportingTask.getScheduledState()) ? Operation.Start : Operation.Stop);
+ actions.add(reportingTaskAction);
+ } else if (component instanceof ControllerServiceNode) {
+ final ControllerServiceNode controllerService = ((ControllerServiceNode) component);
+
+ // create the controller service details
+ ExtensionDetails serviceDetails = new ExtensionDetails();
+ serviceDetails.setType(controllerService.getControllerServiceImplementation().getClass().getSimpleName());
+
+ // create a controller service action
+ Action serviceAction = new Action();
+ serviceAction.setUserDn(user.getDn());
+ serviceAction.setUserName(user.getUserName());
+ serviceAction.setTimestamp(new Date());
+ serviceAction.setSourceId(controllerService.getIdentifier());
+ serviceAction.setSourceName(controllerService.getName());
+ serviceAction.setSourceType(Component.ControllerService);
+ serviceAction.setComponentDetails(serviceDetails);
+ serviceAction.setOperation(isDisabled(controllerService) ? Operation.Disable : Operation.Enable);
+ actions.add(serviceAction);
+
+ // need to consider components referencing this controller service (transitive)
+ if (!visitedServices.contains(controllerService.getIdentifier())) {
+ getUpdateActionsForReferencingComponents(user, actions, visitedServices, controllerService.getReferences().getReferencingComponents());
+ }
+ }
+ }
+ }
+
+ /**
* Audits the removal of a controller service via deleteControllerService().
*
* @param proceedingJoinPoint
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
index aae1afc..9f6b135 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
@@ -345,6 +345,8 @@
* @param annotationData The annotation data for the reporting task
* @param markedForDeletion Array of property names whose value should be removed.
* @param state The updated scheduled state
+ * @param schedulingStrategy The scheduling strategy for this reporting task
+ * @param schedulingPeriod The scheduling period for this reporting task
* @param formParams Additionally, the processor properties and styles are
* specified in the form parameters. Because the property names and styles
* differ from processor to processor they are specified in a map-like
@@ -373,7 +375,8 @@
@FormParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
@PathParam("availability") String availability, @PathParam("id") String id, @FormParam("name") String name,
@FormParam("annotationData") String annotationData, @FormParam("markedForDeletion[]") List<String> markedForDeletion,
- @FormParam("state") String state, MultivaluedMap<String, String> formParams) {
+ @FormParam("state") String state, @FormParam("schedulingStrategy") String schedulingStrategy,
+ @FormParam("schedulingPeriod") String schedulingPeriod, MultivaluedMap<String, String> formParams) {
// create collections for holding the reporting task properties
final Map<String, String> updatedProperties = new LinkedHashMap<>();
@@ -404,6 +407,8 @@
reportingTaskDTO.setId(id);
reportingTaskDTO.setName(name);
reportingTaskDTO.setState(state);
+ reportingTaskDTO.setSchedulingStrategy(schedulingStrategy);
+ reportingTaskDTO.setSchedulingPeriod(schedulingPeriod);
reportingTaskDTO.setAnnotationData(annotationData);
// only set the properties when appropriate
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index ae404de..7286c83 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -843,8 +843,16 @@
dto.setId(reportingTaskNode.getIdentifier());
dto.setName(reportingTaskNode.getName());
dto.setType(reportingTaskNode.getReportingTask().getClass().getName());
+ dto.setSchedulingStrategy(reportingTaskNode.getSchedulingStrategy().name());
+ dto.setSchedulingPeriod(reportingTaskNode.getSchedulingPeriod());
dto.setState(reportingTaskNode.getScheduledState().name());
+ dto.setActiveThreadCount(reportingTaskNode.getActiveThreadCount());
// dto.setComments(reportingTaskNode.getComments());
+
+ final Map<String, String> defaultSchedulingPeriod = new HashMap<>();
+ defaultSchedulingPeriod.put(SchedulingStrategy.TIMER_DRIVEN.name(), SchedulingStrategy.TIMER_DRIVEN.getDefaultSchedulingPeriod());
+ defaultSchedulingPeriod.put(SchedulingStrategy.CRON_DRIVEN.name(), SchedulingStrategy.CRON_DRIVEN.getDefaultSchedulingPeriod());
+ dto.setDefaultSchedulingPeriod(defaultSchedulingPeriod);
// sort a copy of the properties
final Map<PropertyDescriptor, String> sortedProperties = new TreeMap<>(new Comparator<PropertyDescriptor>() {
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index 096058d..38abf64 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -189,15 +189,15 @@
if (controllerServiceState != null) {
if (ControllerServiceState.ENABLED.equals(controllerServiceState)) {
-// serviceProvider.enableReferencingServices(controllerService);
+ serviceProvider.verifyCanEnableReferencingServices(controllerService);
} else {
-// serviceProvider.disableReferencingServices(controllerService);
+ serviceProvider.verifyCanDisableReferencingServices(controllerService);
}
} else if (scheduledState != null) {
if (ScheduledState.RUNNING.equals(scheduledState)) {
-// serviceProvider.scheduleReferencingComponents(controllerService);
+ serviceProvider.verifyCanScheduleReferencingComponents(controllerService);
} else {
-// serviceProvider.unscheduleReferencingComponents(controllerService);
+ serviceProvider.verifyCanStopReferencingComponents(controllerService);
}
}
}
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
index 0bdbe42..41434d3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
@@ -16,26 +16,32 @@
*/
package org.apache.nifi.web.dao.impl;
+import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
-import org.apache.nifi.controller.FlowController;
+import java.util.regex.Matcher;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.reporting.ReportingTaskProvider;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.dao.ReportingTaskDAO;
+import org.quartz.CronExpression;
public class StandardReportingTaskDAO extends ComponentDAO implements ReportingTaskDAO {
- private FlowController reportingTaskProvider;
+ private ReportingTaskProvider reportingTaskProvider;
/**
* Locates the specified reporting task.
@@ -121,10 +127,10 @@
public ReportingTaskNode updateReportingTask(final ReportingTaskDTO reportingTaskDTO) {
// get the reporting task
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskDTO.getId());
-
+
// ensure we can perform the update
verifyUpdate(reportingTask, reportingTaskDTO);
-
+
// perform the update
configureReportingTask(reportingTask, reportingTaskDTO);
@@ -166,22 +172,57 @@
}
}
}
-
+
return reportingTask;
}
/**
* Validates the specified configuration for the specified reporting task.
- *
+ *
* @param reportingTask
* @param reportingTaskDTO
- * @return
+ * @return
*/
private List<String> validateProposedConfiguration(final ReportingTaskNode reportingTask, final ReportingTaskDTO reportingTaskDTO) {
final List<String> validationErrors = new ArrayList<>();
+
+ // get the current scheduling strategy
+ SchedulingStrategy schedulingStrategy = reportingTask.getSchedulingStrategy();
+
+ // validate the new scheduling strategy if appropriate
+ if (isNotNull(reportingTaskDTO.getSchedulingStrategy())) {
+ try {
+ // this will be the new scheduling strategy so use it
+ schedulingStrategy = SchedulingStrategy.valueOf(reportingTaskDTO.getSchedulingStrategy());
+ } catch (IllegalArgumentException iae) {
+ validationErrors.add(String.format("Scheduling strategy: Value must be one of [%s]", StringUtils.join(SchedulingStrategy.values(), ", ")));
+ }
+ }
+
+ // validate the scheduling period based on the scheduling strategy
+ if (isNotNull(reportingTaskDTO.getSchedulingPeriod())) {
+ switch (schedulingStrategy) {
+ case TIMER_DRIVEN:
+ final Matcher schedulingMatcher = FormatUtils.TIME_DURATION_PATTERN.matcher(reportingTaskDTO.getSchedulingPeriod());
+ if (!schedulingMatcher.matches()) {
+ validationErrors.add("Scheduling period is not a valid time duration (ie 30 sec, 5 min)");
+ }
+ break;
+ case CRON_DRIVEN:
+ try {
+ new CronExpression(reportingTaskDTO.getSchedulingPeriod());
+ } catch (final ParseException pe) {
+ throw new IllegalArgumentException(String.format("Scheduling Period '%s' is not a valid cron expression: %s", reportingTaskDTO.getSchedulingPeriod(), pe.getMessage()));
+ } catch (final Exception e) {
+ throw new IllegalArgumentException("Scheduling Period is not a valid cron expression: " + reportingTaskDTO.getSchedulingPeriod());
+ }
+ break;
+ }
+ }
+
return validationErrors;
}
-
+
@Override
public void verifyDelete(final String reportingTaskId) {
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskId);
@@ -193,12 +234,12 @@
final ReportingTaskNode reportingTask = locateReportingTask(reportingTaskDTO.getId());
verifyUpdate(reportingTask, reportingTaskDTO);
}
-
+
/**
* Verifies the reporting task can be updated.
- *
+ *
* @param reportingTask
- * @param reportingTaskDTO
+ * @param reportingTaskDTO
*/
private void verifyUpdate(final ReportingTaskNode reportingTask, final ReportingTaskDTO reportingTaskDTO) {
// ensure the state, if specified, is valid
@@ -234,13 +275,15 @@
reportingTaskDTO.getState()));
}
}
-
+
boolean modificationRequest = false;
if (isAnyNotNull(reportingTaskDTO.getName(),
+ reportingTaskDTO.getSchedulingStrategy(),
+ reportingTaskDTO.getSchedulingPeriod(),
reportingTaskDTO.getAnnotationData(),
reportingTaskDTO.getProperties())) {
modificationRequest = true;
-
+
// validate the request
final List<String> requestValidation = validateProposedConfiguration(reportingTask, reportingTaskDTO);
@@ -249,26 +292,36 @@
throw new ValidationException(requestValidation);
}
}
-
+
if (modificationRequest) {
reportingTask.verifyCanUpdate();
}
}
-
+
/**
* Configures the specified reporting task.
- *
+ *
* @param reportingTask
- * @param reportingTaskDTO
+ * @param reportingTaskDTO
*/
private void configureReportingTask(final ReportingTaskNode reportingTask, final ReportingTaskDTO reportingTaskDTO) {
final String name = reportingTaskDTO.getName();
+ final String schedulingStrategy = reportingTaskDTO.getSchedulingStrategy();
+ final String schedulingPeriod = reportingTaskDTO.getSchedulingPeriod();
final String annotationData = reportingTaskDTO.getAnnotationData();
final Map<String, String> properties = reportingTaskDTO.getProperties();
+
+ // ensure scheduling strategy is set first
+ if (isNotNull(schedulingStrategy)) {
+ reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(schedulingStrategy));
+ }
if (isNotNull(name)) {
reportingTask.setName(name);
}
+ if (isNotNull(schedulingPeriod)) {
+ reportingTask.setScheduldingPeriod(schedulingPeriod);
+ }
if (isNotNull(annotationData)) {
reportingTask.setAnnotationData(annotationData);
}
@@ -284,7 +337,7 @@
}
}
}
-
+
/**
* Deletes the specified reporting task.
*
@@ -297,8 +350,7 @@
}
/* setters */
-
- public void setReportingTaskProvider(FlowController reportingTaskProvider) {
+ public void setReportingTaskProvider(ReportingTaskProvider reportingTaskProvider) {
this.reportingTaskProvider = reportingTaskProvider;
}
}
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/controller-service-configuration.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/controller-service-configuration.jsp
index 338a906..604dd40 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/controller-service-configuration.jsp
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/controller-service-configuration.jsp
@@ -43,7 +43,7 @@
<span id="controller-service-type"></span>
</div>
</div>
- <div id="availability-setting-container" class="setting hidden">
+ <div id="controller-service-availability-setting-container" class="setting hidden">
<div class="availability-setting">
<div class="setting-name">
Availability
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/reporting-task-configuration.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/reporting-task-configuration.jsp
index c1d816f..3b4906c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/reporting-task-configuration.jsp
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/reporting-task-configuration.jsp
@@ -20,40 +20,68 @@
<div id="reporting-task-configuration-tabs"></div>
<div id="reporting-task-configuration-tabs-content">
<div id="reporting-task-standard-settings-tab-content" class="configuration-tab">
- <div class="setting">
- <div class="setting-name">Name</div>
- <div class="setting-field">
- <input type="text" id="reporting-task-name" name="reporting-task-name"/>
- <div class="reporting-task-enabled-container">
- <div id="reporting-task-enabled" class="nf-checkbox checkbox-unchecked"></div>
- <span> Enabled</span>
- </div>
- </div>
- </div>
- <div class="setting">
- <div class="setting-name">Id</div>
- <div class="setting-field">
- <span id="reporting-task-id"></span>
- </div>
- </div>
- <div class="setting">
- <div class="setting-name">Type</div>
- <div class="setting-field">
- <span id="reporting-task-type"></span>
- </div>
- </div>
- <div id="availability-setting-container" class="setting hidden">
- <div class="availability-setting">
- <div class="setting-name">
- Availability
- <img class="setting-icon icon-info" src="images/iconInfo.png" alt="Info" title="Where this controller service is available."/>
- </div>
+ <div class="settings-left">
+ <div class="setting">
+ <div class="setting-name">Name</div>
<div class="setting-field">
- <div id="availability"></div>
+ <input type="text" id="reporting-task-name" name="reporting-task-name"/>
+ <div class="reporting-task-enabled-container">
+ <div id="reporting-task-enabled" class="nf-checkbox checkbox-unchecked"></div>
+ <span> Enabled</span>
+ </div>
</div>
</div>
- <div class="clear"></div>
+ <div class="setting">
+ <div class="setting-name">Id</div>
+ <div class="setting-field">
+ <span id="reporting-task-id"></span>
+ </div>
+ </div>
+ <div class="setting">
+ <div class="setting-name">Type</div>
+ <div class="setting-field">
+ <span id="reporting-task-type"></span>
+ </div>
+ </div>
+ <div id="reporting-task-availability-setting-container" class="setting hidden">
+ <div class="availability-setting">
+ <div class="setting-name">
+ Availability
+ <img class="setting-icon icon-info" src="images/iconInfo.png" alt="Info" title="Where this controller service is available."/>
+ </div>
+ <div class="setting-field">
+ <div id="availability"></div>
+ </div>
+ </div>
+ <div class="clear"></div>
+ </div>
</div>
+ <div class="spacer"> </div>
+ <div class="settings-right">
+ <div class="setting">
+ <div class="reporting-task-scheduling-strategy-container">
+ <div class="setting-name">
+ Scheduling strategy
+ <img class="setting-icon icon-info" src="images/iconInfo.png" alt="Info" title="The strategy used to schedule this reporting task."/>
+ </div>
+ <div class="setting-field">
+ <div type="text" id="reporting-task-scheduling-strategy-combo"></div>
+ </div>
+ </div>
+ <div class="reporting-task-scheduling-period-container">
+ <div class="setting-name">
+ Run schedule
+ <img class="setting-icon icon-info" src="images/iconInfo.png" alt="Info" title="The amount of time that should elapse between task executions."/>
+ </div>
+ <div class="setting-field">
+ <input type="text" id="reporting-task-timer-driven-scheduling-period" class="reporting-task-scheduling-period small-setting-input"/>
+ <input type="text" id="reporting-task-cron-driven-scheduling-period" class="reporting-task-scheduling-period small-setting-input"/>
+ </div>
+ </div>
+ <div class="clear"></div>
+ </div>
+ </div>
+ <div class="clear"></div>
</div>
<div id="reporting-task-properties-tab-content" class="configuration-tab">
<div id="reporting-task-properties"></div>
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/reporting-task.css b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/reporting-task.css
index 952aebd..50ecd41 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/reporting-task.css
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/reporting-task.css
@@ -55,6 +55,21 @@
/* reporting-task settings */
+#reporting-task-configuration div.settings-left {
+ float: left;
+ width: 356px;
+}
+
+#reporting-task-configuration div.settings-right {
+ float: left;
+ width: 356px;
+}
+
+#reporting-task-configuration div.spacer {
+ float: left;
+ margin-right: 40px;
+}
+
#reporting-task-name {
font-size: 11px !important;
width: 250px;
@@ -77,4 +92,19 @@
div.availability-setting {
float: left;
width: 140px;
+}
+
+div.reporting-task-scheduling-strategy-container, div.reporting-task-scheduling-period-container {
+ float: left;
+ width: 175px;
+}
+
+#reporting-task-scheduling-strategy-combo {
+ width: 145px;
+ height: 18px;
+ line-height: 18px;
+}
+
+input.reporting-task-scheduling-period {
+ font-size: 11px !important;
}
\ No newline at end of file
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
index dfe16c4..b570075 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
@@ -68,7 +68,9 @@
return true;
}
- if ($('#controller-service-enabled').hasClass('checkbox-checked')) {
+ if ($('#controller-service-enabled').hasClass('checkbox-checked') && details['state'] === 'DISABLED') {
+ return true;
+ } else if ($('#controller-service-enabled').hasClass('checkbox-unchecked') && details['state'] === 'ENABLED') {
return true;
}
@@ -94,7 +96,9 @@
}
// mark the controller service enabled if appropriate
- if ($('#controller-service-enabled').hasClass('checkbox-checked')) {
+ if ($('#controller-service-enabled').hasClass('checkbox-unchecked')) {
+ controllerServiceDto['state'] = 'DISABLED';
+ } else if ($('#controller-service-enabled').hasClass('checkbox-checked')) {
controllerServiceDto['state'] = 'ENABLED';
}
@@ -285,7 +289,7 @@
/**
* Adds the specified reference for this controller service.
*
- * @argument {jQuery} referenceContainer
+ * @param {jQuery} referenceContainer
* @param {array} referencingComponents
*/
var createReferencingComponents = function (referenceContainer, referencingComponents) {
@@ -445,11 +449,8 @@
// wait unil the polling of each service finished
return $.Deferred(function(deferred) {
- updated.done(function(response) {
+ updated.done(function() {
var serviceUpdated = pollService(controllerService, function (service) {
- // update the service in the table
- renderControllerService(service);
-
// the condition is met once the service is ENABLED/DISABLED
if (enabled) {
return service.state === 'ENABLED';
@@ -461,9 +462,6 @@
// once the service has updated, resolve and render the updated service
serviceUpdated.done(function () {
deferred.resolve();
-
- // update the service in the table
- renderControllerService(response.controllerService);
}).fail(function() {
deferred.reject();
});
@@ -505,9 +503,9 @@
* Updates the scheduled state of the processors/reporting tasks referencing
* the specified controller service.
*
- * @param {type} controllerService
- * @param {type} running
- * @param {type} pollCondition
+ * @param {object} controllerService
+ * @param {boolean} running
+ * @param {function} pollCondition
*/
var updateReferencingSchedulableComponents = function (controllerService, running, pollCondition) {
var revision = nf.Client.getRevision();
@@ -549,10 +547,9 @@
// start polling for each controller service
var polling = [];
services.forEach(function(controllerServiceId) {
- var controllerService = controllerServiceData.getItemById(controllerServiceId);
- polling.push(stopReferencingSchedulableComponents(controllerService, pollCondition));
+ var referencingService = controllerServiceData.getItemById(controllerServiceId);
+ polling.push(stopReferencingSchedulableComponents(referencingService, pollCondition));
});
-
}
$.when.apply(window, polling).done(function () {
@@ -577,7 +574,7 @@
var pollService = function (controllerService, completeCondition, pollCondition) {
// we want to keep polling until the condition is met
return $.Deferred(function(deferred) {
- var current = 1;
+ var current = 2;
var getTimeout = function () {
var val = current;
@@ -631,9 +628,6 @@
return pollService(controllerService, function (service) {
var referencingComponents = service.referencingComponents;
- // update the service in the table
- renderControllerService(service);
-
var stillRunning = false;
$.each(referencingComponents, function(_, referencingComponent) {
if (referencingComponent.referenceType === 'Processor' || referencingComponent.referenceType === 'ReportingTask') {
@@ -658,17 +652,14 @@
/**
* Continues to poll until all referencing services are enabled.
*
- * @param {type} controllerService
- * @param {type} pollCondition
+ * @param {object} controllerService
+ * @param {function} pollCondition
*/
var enableReferencingServices = function (controllerService, pollCondition) {
// continue to poll the service until all referencing services are enabled
return pollService(controllerService, function (service) {
var referencingComponents = service.referencingComponents;
- // update the service in the table
- renderControllerService(service);
-
var notEnabled = false;
$.each(referencingComponents, function(_, referencingComponent) {
if (referencingComponent.referenceType === 'ControllerService') {
@@ -690,17 +681,14 @@
/**
* Continues to poll until all referencing services are disabled.
*
- * @param {type} controllerService
- * @param {type} pollCondition
+ * @param {object} controllerService
+ * @param {function} pollCondition
*/
var disableReferencingServices = function (controllerService, pollCondition) {
// continue to poll the service until all referencing services are disabled
return pollService(controllerService, function (service) {
var referencingComponents = service.referencingComponents;
- // update the service in the table
- renderControllerService(service);
-
var notDisabled = false;
$.each(referencingComponents, function(_, referencingComponent) {
if (referencingComponent.referenceType === 'ControllerService') {
@@ -760,12 +748,12 @@
// start polling for each controller service
var polling = [];
services.forEach(function(controllerServiceId) {
- var controllerService = controllerServiceData.getItemById(controllerServiceId);
+ var referencingService = controllerServiceData.getItemById(controllerServiceId);
if (enabled) {
- polling.push(enableReferencingServices(controllerService, pollCondition));
+ polling.push(enableReferencingServices(referencingService, pollCondition));
} else {
- polling.push(disableReferencingServices(controllerService, pollCondition));
+ polling.push(disableReferencingServices(referencingService, pollCondition));
}
});
@@ -875,36 +863,41 @@
$('#disable-progress-label').text('Steps to disable ' + controllerService.name);
var disableReferencingSchedulable = $('#disable-referencing-schedulable').addClass('ajax-loading');
- // stop all referencing schedulable components
- var stopped = updateReferencingSchedulableComponents(controllerService, false, continuePolling);
+ $.Deferred(function (deferred) {
+ // stop all referencing schedulable components
+ var stopped = updateReferencingSchedulableComponents(controllerService, false, continuePolling);
- // once everything has stopped
- stopped.done(function () {
- disableReferencingSchedulable.removeClass('ajax-loading').addClass('ajax-complete');
- var disableReferencingServices = $('#disable-referencing-services').addClass('ajax-loading');
-
- // disable all referencing services
- var disabled = updateReferencingServices(controllerService, false, continuePolling);
+ // once everything has stopped
+ stopped.done(function () {
+ disableReferencingSchedulable.removeClass('ajax-loading').addClass('ajax-complete');
+ var disableReferencingServices = $('#disable-referencing-services').addClass('ajax-loading');
- // everything is disabled
- disabled.done(function () {
- disableReferencingServices.removeClass('ajax-loading').addClass('ajax-complete');
- var disableControllerService = $('#disable-controller-service').addClass('ajax-loading');
-
- // disable this service
- setEnabled(controllerService, false, continuePolling).done(function () {
- disableControllerService.removeClass('ajax-loading').addClass('ajax-complete');
+ // disable all referencing services
+ var disabled = updateReferencingServices(controllerService, false, continuePolling);
+
+ // everything is disabled
+ disabled.done(function () {
+ disableReferencingServices.removeClass('ajax-loading').addClass('ajax-complete');
+ var disableControllerService = $('#disable-controller-service').addClass('ajax-loading');
+
+ // disable this service
+ setEnabled(controllerService, false, continuePolling).done(function () {
+ deferred.resolve();
+ disableControllerService.removeClass('ajax-loading').addClass('ajax-complete');
+ }).fail(function () {
+ deferred.reject();
+ disableControllerService.removeClass('ajax-loading').addClass('ajax-error');
+ });
}).fail(function () {
- disableControllerService.removeClass('ajax-loading').addClass('ajax-error');
- }).always(function () {
- setCloseButton();
+ deferred.reject();
+ disableReferencingServices.removeClass('ajax-loading').addClass('ajax-error');
});
}).fail(function () {
- disableReferencingServices.removeClass('ajax-loading').addClass('ajax-error');
- setCloseButton();
+ deferred.reject();
+ disableReferencingSchedulable.removeClass('ajax-loading').addClass('ajax-error');
});
- }).fail(function () {
- disableReferencingSchedulable.removeClass('ajax-loading').addClass('ajax-error');
+ }).always(function () {
+ reloadControllerService(controllerService);
setCloseButton();
});
};
@@ -961,48 +954,53 @@
$('#enable-progress-label').text('Steps to enable ' + controllerService.name);
var enableControllerService = $('#enable-controller-service').addClass('ajax-loading');
- // enable this controller service
- var enabled = setEnabled(controllerService, true, continuePolling);
+ $.Deferred(function (deferred) {
+ // enable this controller service
+ var enable = setEnabled(controllerService, true, continuePolling);
- if (scope === config.serviceAndReferencingComponents) {
- // once the service is enabled, activate all referencing components
- enabled.done(function() {
- enableControllerService.removeClass('ajax-loading').addClass('ajax-complete');
- var enableReferencingServices = $('#enable-referencing-services').addClass('ajax-loading');
-
- // enable the referencing services
- var servicesEnabled = updateReferencingServices(controllerService, true, continuePolling);
+ if (scope === config.serviceAndReferencingComponents) {
+ // once the service is enabled, activate all referencing components
+ enable.done(function() {
+ enableControllerService.removeClass('ajax-loading').addClass('ajax-complete');
+ var enableReferencingServices = $('#enable-referencing-services').addClass('ajax-loading');
- // once all the referencing services are enbled
- servicesEnabled.done(function () {
- enableReferencingServices.removeClass('ajax-loading').addClass('ajax-complete');
- var enableReferencingSchedulable = $('#enable-referencing-schedulable').addClass('ajax-loading');
-
- // start all referencing schedulable components
- updateReferencingSchedulableComponents(controllerService, true, continuePolling).done(function() {
- enableReferencingSchedulable.removeClass('ajax-loading').addClass('ajax-complete');
+ // enable the referencing services
+ var servicesEnabled = updateReferencingServices(controllerService, true, continuePolling);
+
+ // once all the referencing services are enbled
+ servicesEnabled.done(function () {
+ enableReferencingServices.removeClass('ajax-loading').addClass('ajax-complete');
+ var enableReferencingSchedulable = $('#enable-referencing-schedulable').addClass('ajax-loading');
+
+ // start all referencing schedulable components
+ updateReferencingSchedulableComponents(controllerService, true, continuePolling).done(function() {
+ deferred.resolve();
+ enableReferencingSchedulable.removeClass('ajax-loading').addClass('ajax-complete');
+ }).fail(function () {
+ deferred.reject();
+ enableReferencingSchedulable.removeClass('ajax-loading').addClass('ajax-error');
+ });
}).fail(function () {
- enableReferencingSchedulable.removeClass('ajax-loading').addClass('ajax-error');
- }).always(function () {
- setCloseButton();
+ deferred.reject();
+ enableReferencingServices.removeClass('ajax-loading').addClass('ajax-error');
});
}).fail(function () {
- enableReferencingServices.removeClass('ajax-loading').addClass('ajax-error');
- setCloseButton();
+ deferred.reject();
+ enableControllerService.removeClass('ajax-loading').addClass('ajax-error');
});
- }).fail(function () {
- enableControllerService.removeClass('ajax-loading').addClass('ajax-error');
- setCloseButton();
- });
- } else {
- enabled.done(function() {
- enableControllerService.removeClass('ajax-loading').addClass('ajax-complete');
- }).fail(function () {
- enableControllerService.removeClass('ajax-loading').addClass('ajax-error');
- }).always(function () {
- setCloseButton();
- });
- }
+ } else {
+ enable.done(function() {
+ deferred.resolve();
+ enableControllerService.removeClass('ajax-loading').addClass('ajax-complete');
+ }).fail(function () {
+ deferred.reject();
+ enableControllerService.removeClass('ajax-loading').addClass('ajax-error');
+ });
+ }
+ }).always(function () {
+ reloadControllerService(controllerService);
+ setCloseButton();
+ });
};
return {
@@ -1041,7 +1039,7 @@
// we clustered we need to show the controls for editing the availability
if (nf.Canvas.isClustered()) {
- $('#availability-setting-container').show();
+ $('#controller-service-availability-setting-container').show();
}
// initialize the conroller service configuration dialog
@@ -1204,7 +1202,7 @@
/**
* Shows the configuration dialog for the specified controller service.
*
- * @argument {controllerService} controllerService The controller service
+ * @argument {object} controllerService The controller service
*/
showConfiguration: function (controllerService) {
// reload the service in case the property descriptors have changed
@@ -1232,11 +1230,17 @@
// record the controller service details
$('#controller-service-configuration').data('controllerServiceDetails', controllerService);
+ // determine if the enabled checkbox is checked or not
+ var controllerServiceEnableStyle = 'checkbox-checked';
+ if (controllerService['state'] === 'DISABLED') {
+ controllerServiceEnableStyle = 'checkbox-unchecked';
+ }
+
// populate the controller service settings
$('#controller-service-id').text(controllerService['id']);
$('#controller-service-type').text(nf.Common.substringAfterLast(controllerService['type'], '.'));
$('#controller-service-name').val(controllerService['name']);
- $('#controller-service-enabled').removeClass('checkbox-checked checkbox-unchecked').addClass('checkbox-unchecked');
+ $('#controller-service-enabled').removeClass('checkbox-checked checkbox-unchecked').addClass(controllerServiceEnableStyle);
$('#controller-service-comments').val(controllerService['comments']);
// select the availability when appropriate
@@ -1384,7 +1388,9 @@
*/
enable: function(controllerService) {
if (nf.Common.isEmpty(controllerService.referencingComponents)) {
- setEnabled(controllerService, true);
+ setEnabled(controllerService, true).always(function () {
+ reloadControllerService(controllerService);
+ });
} else {
showEnableControllerServiceDialog(controllerService);
}
@@ -1397,7 +1403,9 @@
*/
disable: function(controllerService) {
if (nf.Common.isEmpty(controllerService.referencingComponents)) {
- setEnabled(controllerService, false);
+ setEnabled(controllerService, false).always(function () {
+ reloadControllerService(controllerService);
+ });
} else {
showDisableControllerServiceDialog(controllerService);
}
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-reporting-task.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-reporting-task.js
index b4ce08e..fec1d10 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-reporting-task.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-reporting-task.js
@@ -59,8 +59,28 @@
if ($('#reporting-task-name').val() !== details.name) {
return true;
}
+ if ($('#reporting-task-enabled').hasClass('checkbox-checked') && details['state'] === 'DISABLED') {
+ return true;
+ } else if ($('#reporting-task-enabled').hasClass('checkbox-unchecked') && (details['state'] === 'RUNNING' || details['state'] === 'STOPPED')) {
+ return true;
+ }
- if ($('#reporting-task-enabled').hasClass('checkbox-checked')) {
+ // consider the scheduling strategy
+ var schedulingStrategy = $('#reporting-task-scheduling-strategy-combo').combo('getSelectedOption').value;
+ if (schedulingStrategy !== (details['schedulingStrategy'] + '')) {
+ return true;
+ }
+
+ // get the appropriate scheduling period field
+ var schedulingPeriod;
+ if (schedulingStrategy === 'CRON_DRIVEN') {
+ schedulingPeriod = $('#reporting-task-cron-driven-scheduling-period');
+ } else {
+ schedulingPeriod = $('#reporting-task-timer-driven-scheduling-period');
+ }
+
+ // check the scheduling period
+ if (nf.Common.isDefinedAndNotNull(schedulingPeriod) && schedulingPeriod.val() !== (details['schedulingPeriod'] + '')) {
return true;
}
@@ -75,10 +95,30 @@
// properties
var properties = $('#reporting-task-properties').propertytable('marshalProperties');
+ // get the scheduling strategy
+ var schedulingStrategy = $('#reporting-task-scheduling-strategy-combo').combo('getSelectedOption').value;
+
+ // get the appropriate scheduling period field
+ var schedulingPeriod;
+ if (schedulingStrategy === 'CRON_DRIVEN') {
+ schedulingPeriod = $('#reporting-task-cron-driven-scheduling-period');
+ } else {
+ schedulingPeriod = $('#reporting-task-timer-driven-scheduling-period');
+ }
+
// create the reporting task dto
var reportingTaskDto = {};
reportingTaskDto['id'] = $('#reporting-task-id').text();
reportingTaskDto['name'] = $('#reporting-task-name').val();
+ reportingTaskDto['schedulingStrategy'] = schedulingStrategy;
+ reportingTaskDto['schedulingPeriod'] = schedulingPeriod.val();
+
+ // mark the processor disabled if appropriate
+ if ($('#reporting-task-enabled').hasClass('checkbox-unchecked')) {
+ reportingTaskDto['state'] = 'DISABLED';
+ } else if ($('#reporting-task-enabled').hasClass('checkbox-checked')) {
+ reportingTaskDto['state'] = 'STOPPED';
+ }
// set the properties
if ($.isEmptyObject(properties) === false) {
@@ -100,7 +140,23 @@
* @argument {object} details The details to validate
*/
var validateDetails = function (details) {
- return true;
+ var errors = [];
+ var reportingTask = details['reportingTask'];
+
+ if (nf.Common.isBlank(reportingTask['schedulingPeriod'])) {
+ errors.push('Run schedule must be specified');
+ }
+
+ if (errors.length > 0) {
+ nf.Dialog.showOkDialog({
+ dialogContent: nf.Common.formatUnorderedList(errors),
+ overlayBackground: false,
+ headerText: 'Configuration Error'
+ });
+ return false;
+ } else {
+ return true;
+ }
};
/**
@@ -184,7 +240,7 @@
// we clustered we need to show the controls for editing the availability
if (nf.Canvas.isClustered()) {
- $('#availability-setting-container').show();
+ $('#reporting-task-availability-setting-container').show();
}
// initialize the reporting task configuration dialog
@@ -246,10 +302,17 @@
// record the reporting task details
$('#reporting-task-configuration').data('reportingTaskDetails', reportingTask);
+ // determine if the enabled checkbox is checked or not
+ var reportingTaskEnableStyle = 'checkbox-checked';
+ if (reportingTask['state'] === 'DISABLED') {
+ reportingTaskEnableStyle = 'checkbox-unchecked';
+ }
+
// populate the reporting task settings
$('#reporting-task-id').text(reportingTask['id']);
$('#reporting-task-type').text(nf.Common.substringAfterLast(reportingTask['type'], '.'));
$('#reporting-task-name').val(reportingTask['name']);
+ $('#reporting-task-enabled').removeClass('checkbox-unchecked checkbox-checked').addClass(reportingTaskEnableStyle);
// select the availability when appropriate
if (nf.Canvas.isClustered()) {
@@ -260,6 +323,43 @@
}
}
+ // get the default schedule period
+ var defaultSchedulingPeriod = reportingTask['defaultSchedulingPeriod'];
+ var cronSchedulingPeriod = $('#reporting-task-cron-driven-scheduling-period').val(defaultSchedulingPeriod['CRON_DRIVEN']);
+ var timerSchedulingPeriod = $('#reporting-task-timer-driven-scheduling-period').val(defaultSchedulingPeriod['TIMER_DRIVEN']);
+
+ // set the scheduling period as appropriate
+ if (reportingTask['schedulingStrategy'] === 'CRON_DRIVEN') {
+ cronSchedulingPeriod.val(reportingTask['schedulingPeriod']);
+ } else {
+ timerSchedulingPeriod.val(reportingTask['schedulingPeriod']);
+ }
+
+ // initialize the scheduling strategy
+ $('#reporting-task-scheduling-strategy-combo').combo({
+ options: [{
+ text: 'Timer driven',
+ value: 'TIMER_DRIVEN',
+ description: 'Reporting task will be scheduled to run on an interval defined by the run schedule.'
+ }, {
+ text: 'CRON driven',
+ value: 'CRON_DRIVEN',
+ description: 'Reporting task will be scheduled to run on at specific times based on the specified CRON string.'
+ }],
+ selectedOption: {
+ value: reportingTask['schedulingStrategy']
+ },
+ select: function (selectedOption) {
+ if (selectedOption.value === 'CRON_DRIVEN') {
+ timerSchedulingPeriod.hide();
+ cronSchedulingPeriod.show();
+ } else {
+ timerSchedulingPeriod.show();
+ cronSchedulingPeriod.hide();
+ }
+ }
+ });
+
var buttons = [{
buttonText: 'Apply',
handler: {
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
index 0353e27..cf29730 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
@@ -747,12 +747,33 @@
return markup;
};
+ var controllerServiceActionFormatter = function (row, cell, value, columnDef, dataContext) {
+ var markup = '';
+
+ // only DFMs can edit a controller service
+ if (nf.Common.isDFM()) {
+ if (dataContext.state === 'ENABLED' || dataContext.state === 'ENABLING') {
+ markup += '<img src="images/iconDisable.png" title="Disable" class="pointer disable-controller-service" style="margin-top: 2px;" /> ';
+ } else if (dataContext.state === 'DISABLED') {
+ markup += '<img src="images/iconEdit.png" title="Edit" class="pointer edit-controller-service" style="margin-top: 2px;" /> ';
+ markup += '<img src="images/iconEnable.png" title="Enable" class="pointer enable-controller-service" style="margin-top: 2px;"/> ';
+ markup += '<img src="images/iconDelete.png" title="Remove" class="pointer delete-controller-service" style="margin-top: 2px;" /> ';
+ }
+ }
+
+ // always include a button to view the usage
+ markup += '<img src="images/iconUsage.png" title="Usage" class="pointer controller-service-usage" style="margin-top: 2px;"/> ';
+
+ return markup;
+ };
+
// define the column model for the controller services table
var controllerServicesColumns = [
{id: 'moreDetails', name: ' ', resizable: false, formatter: moreControllerServiceDetails, sortable: false, width: 50, maxWidth: 50},
{id: 'name', field: 'name', name: 'Name', sortable: true, resizable: true},
{id: 'type', field: 'type', name: 'Type', formatter: typeFormatter, sortable: true, resizable: true},
- {id: 'state', field: 'state', name: 'State', sortable: true, resizeable: true}
+ {id: 'state', field: 'state', name: 'State', sortable: true, resizeable: true},
+ {id: 'actions', name: ' ', resizable: false, formatter: controllerServiceActionFormatter, sortable: false, width: 90, maxWidth: 90}
];
// only show availability when clustered
@@ -760,30 +781,6 @@
controllerServicesColumns.push({id: 'availability', field: 'availability', name: 'Availability', formatter: availabilityFormatter, sortable: true, resizeable: true});
}
- // only DFM can edit controller services
- if (nf.Common.isDFM()) {
- var controllerServiceActionFormatter = function (row, cell, value, columnDef, dataContext) {
- var markup = '';
-
- if (dataContext.state === 'ENABLED' || dataContext.state === 'ENABLING') {
- markup += '<img src="images/iconDisable.png" title="Disable" class="pointer disable-controller-service" style="margin-top: 2px;" /> ';
- } else if (dataContext.state === 'DISABLED') {
- markup += '<img src="images/iconEdit.png" title="Edit" class="pointer edit-controller-service" style="margin-top: 2px;" /> ';
-
- // only enable the enable icon if the service has no validation errors
- if (nf.Common.isEmpty(dataContext.validationErrors)) {
- markup += '<img src="images/iconEnable.png" title="Enable" class="pointer enable-controller-service" style="margin-top: 2px;"/> ';
- }
-
- markup += '<img src="images/iconDelete.png" title="Remove" class="pointer delete-controller-service" style="margin-top: 2px;" /> ';
- }
-
- return markup;
- };
-
- controllerServicesColumns.push({id: 'actions', name: ' ', resizable: false, formatter: controllerServiceActionFormatter, sortable: false, width: 75, maxWidth: 75});
- }
-
// initialize the dataview
var controllerServicesData = new Slick.Data.DataView({
inlineFilters: false
@@ -825,6 +822,14 @@
nf.ControllerService.disable(controllerService);
} else if (target.hasClass('delete-controller-service')) {
nf.ControllerService.remove(controllerService);
+ } else if (target.hasClass('controller-service-usage')) {
+ // close the settings dialog
+ $('#shell-close-button').click();
+
+ // open the documentation for this reporting task
+ nf.Shell.showPage('../nifi-docs/documentation?' + $.param({
+ select: nf.Common.substringAfterLast(controllerService.type, '.')
+ }));
}
} else if (controllerServicesGrid.getColumns()[args.cell].id === 'moreDetails') {
if (target.hasClass('view-controller-service')) {
@@ -1294,13 +1299,60 @@
}
return markup;
};
+
+ var reportingTaskRunStatusFormatter = function (row, cell, value, columnDef, dataContext) {
+ // determine the appropriate label
+ var label;
+ if (!nf.Common.isEmpty(dataContext.validationErrors)) {
+ label = 'Invalid';
+ } else {
+ if (value === 'STOPPED') {
+ label = 'Stopped';
+ } else if (value === 'RUNNING') {
+ label = 'Running';
+ } else {
+ label = 'Disabled';
+ }
+ }
+
+ // include the active thread count if appropriate
+ var activeThreadCount = '';
+ if (nf.Common.isDefinedAndNotNull(dataContext.activeThreadCount) && dataContext.activeThreadCount > 0) {
+ activeThreadCount = '(' + dataContext.activeThreadCount + ')';
+ }
+
+ // format the markup
+ var formattedValue = '<div class="' + nf.Common.escapeHtml(label.toLowerCase()) + '" style="margin-top: 3px;"></div>';
+ return formattedValue + '<div class="status-text" style="margin-top: 2px; margin-left: 4px; float: left;">' + nf.Common.escapeHtml(label) + '</div><div style="float: left; margin-left: 4px;">' + nf.Common.escapeHtml(activeThreadCount) + '</div>';
+ };
+
+ var reportingTaskActionFormatter = function (row, cell, value, columnDef, dataContext) {
+ var markup = '';
+
+ // only DFMs can edit reporting tasks
+ if (nf.Common.isDFM()) {
+ if (dataContext.state === 'RUNNING') {
+ markup += '<img src="images/iconStop.png" title="Stop" class="pointer stop-reporting-task" style="margin-top: 2px;" /> ';
+ } else if (dataContext.state === 'STOPPED' || dataContext.state === 'DISABLED') {
+ markup += '<img src="images/iconEdit.png" title="Edit" class="pointer edit-reporting-task" style="margin-top: 2px;" /> ';
+ markup += '<img src="images/iconRun.png" title="Start" class="pointer start-reporting-task" style="margin-top: 2px;"/> ';
+ markup += '<img src="images/iconDelete.png" title="Remove" class="pointer delete-reporting-task" style="margin-top: 2px;" /> ';
+ }
+ }
+
+ // always include a button to view the usage
+ markup += '<img src="images/iconUsage.png" title="Usage" class="pointer reporting-task-usage" style="margin-top: 2px;"/> ';
+
+ return markup;
+ };
// define the column model for the reporting tasks table
var reportingTasksColumnModel = [
{id: 'moreDetails', field: 'moreDetails', name: ' ', resizable: false, formatter: moreReportingTaskDetails, sortable: true, width: 50, maxWidth: 50},
{id: 'name', field: 'name', name: 'Name', sortable: true, resizable: true},
{id: 'type', field: 'type', name: 'Type', sortable: true, resizable: true, formatter: typeFormatter},
- {id: 'state', field: 'state', name: 'State', sortable: true, resizeable: true}
+ {id: 'state', field: 'state', name: 'State', sortable: true, resizeable: true, formatter: reportingTaskRunStatusFormatter},
+ {id: 'actions', name: ' ', resizable: false, formatter: reportingTaskActionFormatter, sortable: false, width: 90, maxWidth: 90}
];
// only show availability when clustered
@@ -1308,30 +1360,6 @@
reportingTasksColumnModel.push({id: 'availability', field: 'availability', name: 'Availability', formatter: availabilityFormatter, sortable: true, resizeable: true});
}
- // only DFM can edit reporting tasks
- if (nf.Common.isDFM()) {
- var reportingTaskActionFormatter = function (row, cell, value, columnDef, dataContext) {
- var markup = '';
-
- if (dataContext.state === 'RUNNING') {
- markup += '<img src="images/iconStop.png" title="Stop" class="pointer stop-reporting-task" style="margin-top: 2px;" /> ';
- } else if (dataContext.state === 'STOPPED') {
- markup += '<img src="images/iconEdit.png" title="Edit" class="pointer edit-reporting-task" style="margin-top: 2px;" /> ';
-
- // only enable the start icon if the reporting task has no validation errors
- if (nf.Common.isEmpty(dataContext.validationErrors)) {
- markup += '<img src="images/iconRun.png" title="Start" class="pointer start-reporting-task" style="margin-top: 2px;"/> ';
- }
-
- markup += '<img src="images/iconDelete.png" title="Remove" class="pointer delete-reporting-task" style="margin-top: 2px;" /> ';
- }
-
- return markup;
- };
-
- reportingTasksColumnModel.push({id: 'actions', name: ' ', resizable: false, formatter: reportingTaskActionFormatter, sortable: false, width: 75, maxWidth: 75});
- }
-
// initialize the dataview
var reportingTasksData = new Slick.Data.DataView({
inlineFilters: false
@@ -1373,6 +1401,14 @@
nf.ReportingTask.stop(reportingTask);
} else if (target.hasClass('delete-reporting-task')) {
nf.ReportingTask.remove(reportingTask);
+ } else if (target.hasClass('reporting-task-usage')) {
+ // close the settings dialog
+ $('#shell-close-button').click();
+
+ // open the documentation for this reporting task
+ nf.Shell.showPage('../nifi-docs/documentation?' + $.param({
+ select: nf.Common.substringAfterLast(reportingTask.type, '.')
+ }));
}
} else if (reportingTasksGrid.getColumns()[args.cell].id === 'moreDetails') {
if (target.hasClass('view-reporting-task')) {