blob: 58671d40f025a88da7b31b84c39309ae158ea762 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.service;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractConfiguredComponent;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
private final ControllerService proxedControllerService;
private final ControllerService implementation;
private final ControllerServiceProvider serviceProvider;
private final VariableRegistry variableRegistry;
private final AtomicReference<ControllerServiceState> stateRef = new AtomicReference<>(ControllerServiceState.DISABLED);
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private final Set<ConfiguredComponent> referencingComponents = new HashSet<>();
private String comment;
private ProcessGroup processGroup;
private final AtomicBoolean active;
public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
final VariableRegistry variableRegistry) {
this(proxiedControllerService, implementation, id, validationContextFactory, serviceProvider,
implementation.getClass().getSimpleName(), implementation.getClass().getCanonicalName(), variableRegistry);
}
public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
final String componentType, final String componentCanonicalClass, VariableRegistry variableRegistry) {
super(implementation, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass);
this.proxedControllerService = proxiedControllerService;
this.implementation = implementation;
this.serviceProvider = serviceProvider;
this.active = new AtomicBoolean();
this.variableRegistry = variableRegistry;
}
@Override
public Authorizable getParentAuthorizable() {
final ProcessGroup processGroup = getProcessGroup();
if (processGroup == null) {
return new Authorizable() {
@Override
public Authorizable getParentAuthorizable() {
return null;
}
@Override
public Resource getResource() {
return ResourceFactory.getControllerResource();
}
};
} else {
return processGroup;
}
}
@Override
public Resource getResource() {
return ResourceFactory.getComponentResource(ResourceType.ControllerService, getIdentifier(), getName());
}
@Override
public ControllerService getProxiedControllerService() {
return proxedControllerService;
}
@Override
public ControllerService getControllerServiceImplementation() {
return implementation;
}
@Override
public ProcessGroup getProcessGroup() {
readLock.lock();
try {
return processGroup;
} finally {
readLock.unlock();
}
}
@Override
public void setProcessGroup(final ProcessGroup group) {
writeLock.lock();
try {
this.processGroup = group;
} finally {
writeLock.unlock();
}
}
@Override
public ControllerServiceReference getReferences() {
readLock.lock();
try {
return new StandardControllerServiceReference(this, referencingComponents);
} finally {
readLock.unlock();
}
}
@Override
public void addReference(final ConfiguredComponent referencingComponent) {
writeLock.lock();
try {
referencingComponents.add(referencingComponent);
} finally {
writeLock.unlock();
}
}
@Override
public List<ControllerServiceNode> getRequiredControllerServices() {
Set<ControllerServiceNode> requiredServices = new HashSet<>();
for (Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
PropertyDescriptor descriptor = entry.getKey();
if (descriptor.getControllerServiceDefinition() != null && entry.getValue() != null) {
ControllerServiceNode requiredNode = serviceProvider.getControllerServiceNode(entry.getValue());
requiredServices.add(requiredNode);
requiredServices.addAll(requiredNode.getRequiredControllerServices());
}
}
return new ArrayList<>(requiredServices);
}
@Override
public void removeReference(final ConfiguredComponent referencingComponent) {
writeLock.lock();
try {
referencingComponents.remove(referencingComponent);
} finally {
writeLock.unlock();
}
}
@Override
public void verifyModifiable() throws IllegalStateException {
if (getState() != ControllerServiceState.DISABLED) {
throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first.");
}
}
@Override
public void setProperty(final String name, final String value) {
super.setProperty(name, value);
}
@Override
public boolean removeProperty(String name) {
return super.removeProperty(name);
}
@Override
public void verifyCanDelete() {
if (getState() != ControllerServiceState.DISABLED) {
throw new IllegalStateException(implementation.getIdentifier() + " cannot be deleted because it is not disabled");
}
}
@Override
public void verifyCanDisable() {
verifyCanDisable(Collections.<ControllerServiceNode>emptySet());
}
@Override
public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
if (!this.isActive()) {
throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation().getIdentifier() + " because it is not enabled");
}
final ControllerServiceReference references = getReferences();
final Set<String> activeReferencesIdentifiers = new HashSet<>();
for (final ConfiguredComponent activeReference : references.getActiveReferences()) {
if (!ignoreReferences.contains(activeReference)) {
activeReferencesIdentifiers.add(activeReference.getIdentifier());
}
}
if (!activeReferencesIdentifiers.isEmpty()) {
throw new IllegalStateException(implementation.getIdentifier() + " cannot be disabled because it is referenced by " + activeReferencesIdentifiers.size() +
" components that are currently running: [" + StringUtils.join(activeReferencesIdentifiers, ", ") + "]");
}
}
@Override
public void verifyCanEnable() {
if (getState() != ControllerServiceState.DISABLED) {
throw new IllegalStateException(implementation.getIdentifier() + " cannot be enabled because it is not disabled");
}
if (!isValid()) {
throw new IllegalStateException(implementation.getIdentifier() + " cannot be enabled because it is not valid: " + getValidationErrors());
}
}
@Override
public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences) {
if (getState() != ControllerServiceState.DISABLED) {
throw new IllegalStateException(implementation.getIdentifier() + " cannot be enabled because it is not disabled");
}
final Set<String> ids = new HashSet<>();
for (final ControllerServiceNode node : ignoredReferences) {
ids.add(node.getIdentifier());
}
final Collection<ValidationResult> validationResults = getValidationErrors(ids);
for (final ValidationResult result : validationResults) {
if (!result.isValid()) {
throw new IllegalStateException(implementation.getIdentifier() + " cannot be enabled because it is not valid: " + result);
}
}
}
@Override
public void verifyCanUpdate() {
if (getState() != ControllerServiceState.DISABLED) {
throw new IllegalStateException(implementation.getIdentifier() + " cannot be updated because it is not disabled");
}
}
@Override
public void verifyCanClearState() {
verifyCanUpdate();
}
@Override
public String getComments() {
readLock.lock();
try {
return comment;
} finally {
readLock.unlock();
}
}
@Override
public void setComments(final String comment) {
writeLock.lock();
try {
this.comment = comment;
} finally {
writeLock.unlock();
}
}
@Override
public ControllerServiceState getState() {
return stateRef.get();
}
@Override
public boolean isActive() {
return this.active.get();
}
/**
* Will atomically enable this service by invoking its @OnEnabled operation.
* It uses CAS operation on {@link #stateRef} to transition this service
* from DISABLED to ENABLING state. If such transition succeeds the service
* will be marked as 'active' (see {@link ControllerServiceNode#isActive()}).
* If such transition doesn't succeed then no enabling logic will be
* performed and the method will exit. In other words it is safe to invoke
* this operation multiple times and from multiple threads.
* <br>
* This operation will also perform re-try of service enabling in the event
* of exception being thrown by previous invocation of @OnEnabled.
* <br>
* Upon successful invocation of @OnEnabled this service will be transitioned to
* ENABLED state.
* <br>
* In the event where enabling took longer then expected by the user and such user
* initiated disable operation, this service will be automatically disabled as soon
* as it reached ENABLED state.
*/
@Override
public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis) {
if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) {
this.active.set(true);
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, variableRegistry);
scheduler.execute(new Runnable() {
@Override
public void run() {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
boolean shouldEnable = false;
synchronized (active) {
shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED);
}
if (!shouldEnable) {
LOG.debug("Disabling service " + this + " after it has been enabled due to disable action being initiated.");
// Can only happen if user initiated DISABLE operation before service finished enabling. It's state will be
// set to DISABLING (see disable() operation)
invokeDisable(configContext);
stateRef.set(ControllerServiceState.DISABLED);
}
} catch (Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
invokeDisable(configContext);
if (isActive()) {
scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
} else {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
stateRef.set(ControllerServiceState.DISABLED);
}
}
}
});
}
}
/**
* Will atomically disable this service by invoking its @OnDisabled operation.
* It uses CAS operation on {@link #stateRef} to transition this service
* from ENABLED to DISABLING state. If such transition succeeds the service
* will be de-activated (see {@link ControllerServiceNode#isActive()}).
* If such transition doesn't succeed (the service is still in ENABLING state)
* then the service will still be transitioned to DISABLING state to ensure that
* no other transition could happen on this service. However in such event
* (e.g., its @OnEnabled finally succeeded), the {@link #enable(ScheduledExecutorService, long)}
* operation will initiate service disabling javadoc for (see {@link #enable(ScheduledExecutorService, long)}
* <br>
* Upon successful invocation of @OnDisabled this service will be transitioned to
* DISABLED state.
*/
@Override
public void disable(ScheduledExecutorService scheduler) {
/*
* The reason for synchronization is to ensure consistency of the
* service state when another thread is in the middle of enabling this
* service since it will attempt to transition service state from
* ENABLING to ENABLED but only if it's active.
*/
synchronized (this.active) {
this.active.set(false);
}
if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, ControllerServiceState.DISABLING)) {
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, variableRegistry);
scheduler.execute(new Runnable() {
@Override
public void run() {
try {
invokeDisable(configContext);
} finally {
stateRef.set(ControllerServiceState.DISABLED);
}
}
});
} else {
this.stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.DISABLING);
}
}
/**
*
*/
private void invokeDisable(ConfigurationContext configContext) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext);
} catch (Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
}
}
@Override
protected String getProcessGroupIdentifier() {
final ProcessGroup procGroup = getProcessGroup();
return procGroup == null ? null : procGroup.getIdentifier();
}
}