blob: 139b442134a7a5369a94951118b3227e8de7c894 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.reporting;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractConfiguredComponent;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode {
private final ReportingTask reportingTask;
private final ProcessScheduler processScheduler;
private final ControllerServiceLookup serviceLookup;
private final AtomicReference<SchedulingStrategy> schedulingStrategy = new AtomicReference<>(SchedulingStrategy.TIMER_DRIVEN);
private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins");
private volatile String comment;
private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
protected final VariableRegistry variableRegistry;
public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
final ValidationContextFactory validationContextFactory, final VariableRegistry variableRegistry) {
this(reportingTask, id, controllerServiceProvider, processScheduler, validationContextFactory,
reportingTask.getClass().getSimpleName(), reportingTask.getClass().getCanonicalName(),variableRegistry);
}
public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
final ValidationContextFactory validationContextFactory,
final String componentType, final String componentCanonicalClass, VariableRegistry variableRegistry) {
super(reportingTask, id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass);
this.reportingTask = reportingTask;
this.processScheduler = processScheduler;
this.serviceLookup = controllerServiceProvider;
this.variableRegistry = variableRegistry;
}
@Override
public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) {
this.schedulingStrategy.set(schedulingStrategy);
}
@Override
public SchedulingStrategy getSchedulingStrategy() {
return schedulingStrategy.get();
}
@Override
public String getSchedulingPeriod() {
return schedulingPeriod.get();
}
@Override
public long getSchedulingPeriod(final TimeUnit timeUnit) {
return FormatUtils.getTimeDuration(schedulingPeriod.get(), timeUnit);
}
@Override
public void setScheduldingPeriod(final String schedulingPeriod) {
this.schedulingPeriod.set(schedulingPeriod);
}
@Override
public ReportingTask getReportingTask() {
return reportingTask;
}
@Override
public boolean isRunning() {
return processScheduler.isScheduled(this) || processScheduler.getActiveThreadCount(this) > 0;
}
@Override
public int getActiveThreadCount() {
return processScheduler.getActiveThreadCount(this);
}
@Override
public ConfigurationContext getConfigurationContext() {
return new StandardConfigurationContext(this, serviceLookup, getSchedulingPeriod(), variableRegistry);
}
@Override
public void verifyModifiable() throws IllegalStateException {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Reporting Task while the Reporting Task is running");
}
}
@Override
public ScheduledState getScheduledState() {
return scheduledState;
}
@Override
public void setScheduledState(final ScheduledState state) {
this.scheduledState = state;
}
@Override
public void setProperty(final String name, final String value) {
super.setProperty(name, value);
}
@Override
public boolean removeProperty(String name) {
return super.removeProperty(name);
}
public boolean isDisabled() {
return scheduledState == ScheduledState.DISABLED;
}
@Override
public String getComments() {
return comment;
}
@Override
public void setComments(final String comment) {
this.comment = comment;
}
@Override
public void verifyCanDelete() {
if (isRunning()) {
throw new IllegalStateException("Cannot delete " + reportingTask.getIdentifier() + " because it is currently running");
}
}
@Override
public void verifyCanDisable() {
if (isRunning()) {
throw new IllegalStateException("Cannot disable " + reportingTask.getIdentifier() + " because it is currently running");
}
if (isDisabled()) {
throw new IllegalStateException("Cannot disable " + reportingTask.getIdentifier() + " because it is already disabled");
}
}
@Override
public void verifyCanEnable() {
if (!isDisabled()) {
throw new IllegalStateException("Cannot enable " + reportingTask.getIdentifier() + " because it is not disabled");
}
}
@Override
public void verifyCanStart() {
if (isDisabled()) {
throw new IllegalStateException("Cannot start " + reportingTask.getIdentifier() + " because it is currently disabled");
}
if (isRunning()) {
throw new IllegalStateException("Cannot start " + reportingTask.getIdentifier() + " because it is already running");
}
}
@Override
public void verifyCanStop() {
if (!isRunning()) {
throw new IllegalStateException("Cannot stop " + reportingTask.getIdentifier() + " because it is not running");
}
}
@Override
public void verifyCanUpdate() {
if (isRunning()) {
throw new IllegalStateException("Cannot update " + reportingTask.getIdentifier() + " because it is currently running");
}
}
@Override
public void verifyCanClearState() {
verifyCanUpdate();
}
@Override
public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
switch (getScheduledState()) {
case DISABLED:
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is disabled");
case RUNNING:
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is already running");
case STOPPED:
break;
}
final int activeThreadCount = getActiveThreadCount();
if (activeThreadCount > 0) {
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it has " + activeThreadCount + " active threads already");
}
final Set<String> ids = new HashSet<>();
for (final ControllerServiceNode node : ignoredReferences) {
ids.add(node.getIdentifier());
}
final Collection<ValidationResult> validationResults = getValidationErrors(ids);
for (final ValidationResult result : validationResults) {
if (!result.isValid()) {
throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not valid: " + result);
}
}
}
@Override
public String toString() {
return "ReportingTask[id=" + getIdentifier() + "]";
}
@Override
protected String getProcessGroupIdentifier() {
return null;
}
}