blob: 6b3cc95700b66e4b8b265eb43bd17c201e24b752 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.controller.reporting;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractConfiguredComponent;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.ReflectionUtils;
public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode {
private final ReportingTask reportingTask;
private final ProcessScheduler processScheduler;
private final ControllerServiceLookup serviceLookup;
private final AtomicReference<SchedulingStrategy> schedulingStrategy = new AtomicReference<>(SchedulingStrategy.TIMER_DRIVEN);
private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("1 mins");
private volatile String comment;
private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
final ValidationContextFactory validationContextFactory) {
super(reportingTask, id, validationContextFactory, controllerServiceProvider);
this.reportingTask = reportingTask;
this.processScheduler = processScheduler;
this.serviceLookup = controllerServiceProvider;
public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) {
public SchedulingStrategy getSchedulingStrategy() {
return schedulingStrategy.get();
public String getSchedulingPeriod() {
return schedulingPeriod.get();
public long getSchedulingPeriod(final TimeUnit timeUnit) {
return FormatUtils.getTimeDuration(schedulingPeriod.get(), timeUnit);
public void setScheduldingPeriod(final String schedulingPeriod) {
public ReportingTask getReportingTask() {
return reportingTask;
public boolean isRunning() {
return processScheduler.isScheduled(this) || processScheduler.getActiveThreadCount(this) > 0;
public int getActiveThreadCount() {
return processScheduler.getActiveThreadCount(this);
public ConfigurationContext getConfigurationContext() {
return new StandardConfigurationContext(this, serviceLookup);
public void verifyModifiable() throws IllegalStateException {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Reporting Task while the Reporting Task is running");
public ScheduledState getScheduledState() {
return scheduledState;
public void setScheduledState(final ScheduledState state) {
this.scheduledState = state;
public void setProperty(final String name, final String value) {
super.setProperty(name, value);
public boolean removeProperty(String name) {
final boolean removed = super.removeProperty(name);
if ( removed ) {
return removed;
private void onConfigured() {
// We need to invoke any method annotation with the OnConfigured annotation in order to
// maintain backward compatibility. This will be removed when we remove the old, deprecated annotations.
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceLookup);
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, configContext);
} catch (final Exception e) {
throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + reportingTask, e);
public boolean isDisabled() {
return scheduledState == ScheduledState.DISABLED;
public String getComments() {
return comment;
public void setComments(final String comment) {
this.comment = comment;
public void verifyCanDelete() {
if (isRunning()) {
throw new IllegalStateException("Cannot delete " + reportingTask + " because it is currently running");
public void verifyCanDisable() {
if ( isRunning() ) {
throw new IllegalStateException("Cannot disable " + reportingTask + " because it is currently running");
if ( isDisabled() ) {
throw new IllegalStateException("Cannot disable " + reportingTask + " because it is already disabled");
public void verifyCanEnable() {
if ( !isDisabled() ) {
throw new IllegalStateException("Cannot enable " + reportingTask + " because it is not disabled");
public void verifyCanStart() {
if ( isDisabled() ) {
throw new IllegalStateException("Cannot start " + reportingTask + " because it is currently disabled");
if ( isRunning() ) {
throw new IllegalStateException("Cannot start " + reportingTask + " because it is already running");
public void verifyCanStop() {
if ( !isRunning() ) {
throw new IllegalStateException("Cannot stop " + reportingTask + " because it is not running");
public void verifyCanUpdate() {
if ( isRunning() ) {
throw new IllegalStateException("Cannot update " + reportingTask + " because it is currently running");
public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
switch (getScheduledState()) {
throw new IllegalStateException(this + " cannot be started because it is disabled");
throw new IllegalStateException(this + " cannot be started because it is already running");
final int activeThreadCount = getActiveThreadCount();
if ( activeThreadCount > 0 ) {
throw new IllegalStateException(this + " cannot be started because it has " + activeThreadCount + " active threads already");
final Set<String> ids = new HashSet<>();
for ( final ControllerServiceNode node : ignoredReferences ) {
final Collection<ValidationResult> validationResults = getValidationErrors(ids);
for ( final ValidationResult result : validationResults ) {
if ( !result.isValid() ) {
throw new IllegalStateException(this + " cannot be started because it is not valid: " + result);