blob: 5c8a8e8bdcb48e2abc45c155bd272b2097a94230 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.state.MockStateManager;
import org.junit.jupiter.api.Assertions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static java.util.Objects.requireNonNull;
public class MockProcessContext extends MockControllerServiceLookup implements ProcessContext, ControllerServiceLookup, NodeTypeProvider {
private final ConfigurableComponent component;
private final String componentName;
private final Map<PropertyDescriptor, String> properties = new HashMap<>();
private final StateManager stateManager;
private final VariableRegistry variableRegistry;
private String annotationData = null;
private boolean yieldCalled = false;
private boolean enableExpressionValidation = false;
private boolean allowExpressionValidation = true;
private volatile boolean incomingConnection = true;
private volatile boolean nonLoopConnection = true;
private volatile InputRequirement inputRequirement = null;
private int maxConcurrentTasks = 1;
private volatile Set<Relationship> connections = new HashSet<>();
private volatile Set<Relationship> unavailableRelationships = new HashSet<>();
private volatile boolean isClustered;
private volatile boolean isConfiguredForClustering;
private volatile boolean isPrimaryNode;
private volatile boolean isConnected = true;
public MockProcessContext(final ConfigurableComponent component) {
this(component, null);
}
public MockProcessContext(final ConfigurableComponent component, final String componentName) {
this(component, componentName, new MockStateManager(component), VariableRegistry.EMPTY_REGISTRY);
}
/**
* Creates a new MockProcessContext for the given Processor
*
* @param component being mocked
* @param stateManager state manager
* @param variableRegistry variableRegistry
*/
public MockProcessContext(final ConfigurableComponent component, final StateManager stateManager, final VariableRegistry variableRegistry) {
this(component,null,stateManager,variableRegistry);
}
public MockProcessContext(final ControllerService component,
final MockProcessContext context,
final StateManager stateManager,
final VariableRegistry variableRegistry) {
this(component, null, context, stateManager, variableRegistry);
}
public MockProcessContext(final ControllerService component,
final String componentName,
final MockProcessContext context,
final StateManager stateManager,
final VariableRegistry variableRegistry) {
this(component, componentName, stateManager, variableRegistry);
try {
annotationData = context.getControllerServiceAnnotationData(component);
final Map<PropertyDescriptor, String> props = context.getControllerServiceProperties(component);
properties.putAll(props);
super.addControllerServices(context);
} catch (IllegalArgumentException e) {
// do nothing...the service is being loaded
}
}
/**
* Creates a new MockProcessContext for the given Processor with given name
*
* @param component being mocked
* @param componentName the name to be given the component;
* @param stateManager state manager
* @param variableRegistry variableRegistry
*/
public MockProcessContext(final ConfigurableComponent component,
final String componentName,
final StateManager stateManager,
final VariableRegistry variableRegistry) {
this.component = Objects.requireNonNull(component);
this.componentName = componentName == null ? "" : componentName;
this.inputRequirement = component.getClass().getAnnotation(InputRequirement.class);
this.stateManager = stateManager;
this.variableRegistry = variableRegistry;
}
@Override
public PropertyValue getProperty(final PropertyDescriptor descriptor) {
return getProperty(descriptor.getName());
}
public PropertyValue getPropertyWithoutValidatingExpressions(final PropertyDescriptor propertyDescriptor) {
final PropertyDescriptor canonicalDescriptor = component.getPropertyDescriptor(propertyDescriptor.getName());
if (canonicalDescriptor == null) {
return null;
}
final String setPropertyValue = properties.get(canonicalDescriptor);
final String propValue = (setPropertyValue == null) ? canonicalDescriptor.getDefaultValue() : setPropertyValue;
final MockPropertyValue propertyValue = new MockPropertyValue(propValue, this, canonicalDescriptor, true, variableRegistry);
return propertyValue;
}
@Override
public PropertyValue getProperty(final String propertyName) {
final PropertyDescriptor descriptor = component.getPropertyDescriptor(propertyName);
if (descriptor == null) {
return null;
}
final String setPropertyValue = properties.get(descriptor);
final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue;
final boolean alreadyEvaluated = !this.allowExpressionValidation;
final MockPropertyValue propertyValue = new MockPropertyValue(propValue, this, descriptor, alreadyEvaluated, variableRegistry);
return propertyValue;
}
@Override
public PropertyValue newPropertyValue(final String rawValue) {
return new MockPropertyValue(rawValue, this, variableRegistry);
}
public ValidationResult setProperty(final String propertyName, final String propertyValue) {
return setProperty(new PropertyDescriptor.Builder().name(propertyName).build(), propertyValue);
}
public PropertyDescriptor getPropertyDescriptor(final String propertyName) {
return component.getPropertyDescriptor(propertyName);
}
/**
* Updates the value of the property with the given PropertyDescriptor to
* the specified value IF and ONLY IF the value is valid according to the
* descriptor's validator. Otherwise, the property value is not updated. In
* either case, the ValidationResult is returned, indicating whether or not
* the property is valid
*
* @param descriptor of property to modify
* @param value new value
* @return result
*/
public ValidationResult setProperty(final PropertyDescriptor descriptor, final String value) {
requireNonNull(descriptor);
requireNonNull(value, "Cannot set property to null value; if the intent is to remove the property, call removeProperty instead");
final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(descriptor.getName());
final ValidationResult result = fullyPopulatedDescriptor.validate(value, new MockValidationContext(this, stateManager, variableRegistry));
String oldValue = properties.put(fullyPopulatedDescriptor, value);
if (oldValue == null) {
oldValue = fullyPopulatedDescriptor.getDefaultValue();
}
if ((value == null && oldValue != null) || (value != null && !value.equals(oldValue))) {
component.onPropertyModified(fullyPopulatedDescriptor, oldValue, value);
}
return result;
}
public boolean removeProperty(final PropertyDescriptor descriptor) {
Objects.requireNonNull(descriptor);
return removeProperty(descriptor.getName());
}
public boolean removeProperty(final String property) {
Objects.requireNonNull(property);
final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(property);
String value = null;
if ((value = properties.remove(fullyPopulatedDescriptor)) != null) {
if (!value.equals(fullyPopulatedDescriptor.getDefaultValue())) {
component.onPropertyModified(fullyPopulatedDescriptor, value, null);
}
return true;
}
return false;
}
public void clearProperties() {
Map<PropertyDescriptor, String> properties = getProperties();
for (Map.Entry<PropertyDescriptor, String> e : properties.entrySet()) {
removeProperty(e.getKey());
}
}
@Override
public void yield() {
yieldCalled = true;
}
public boolean isYieldCalled() {
return yieldCalled;
}
public void addControllerService(final String serviceIdentifier, final ControllerService controllerService, final Map<PropertyDescriptor, String> properties, final String annotationData) {
requireNonNull(controllerService);
final ControllerServiceConfiguration config = addControllerService(controllerService);
config.setProperties(properties);
config.setAnnotationData(annotationData);
}
@Override
public int getMaxConcurrentTasks() {
return maxConcurrentTasks;
}
@Override
public ExecutionNode getExecutionNode() {
return ExecutionNode.ALL;
}
public void setAnnotationData(final String annotationData) {
this.annotationData = annotationData;
}
@Override
public String getAnnotationData() {
return annotationData;
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
final List<PropertyDescriptor> supported = component.getPropertyDescriptors();
if (supported == null || supported.isEmpty()) {
return Collections.unmodifiableMap(properties);
} else {
final Map<PropertyDescriptor, String> props = new LinkedHashMap<>();
for (final PropertyDescriptor descriptor : supported) {
props.put(descriptor, null);
}
props.putAll(properties);
return props;
}
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
/**
* Validates the current properties, returning ValidationResults for any
* invalid properties. All processor defined properties will be validated.
* If they are not included in the in the purposed configuration, the
* default value will be used.
*
* @return Collection of validation result objects for any invalid findings
* only. If the collection is empty then the processor is valid. Guaranteed
* non-null
*/
public Collection<ValidationResult> validate() {
final List<ValidationResult> results = new ArrayList<>();
final ValidationContext validationContext = new MockValidationContext(this, stateManager, variableRegistry);
final Collection<ValidationResult> componentResults = component.validate(validationContext);
results.addAll(componentResults);
final Collection<ValidationResult> serviceResults = validateReferencedControllerServices(validationContext);
results.addAll(serviceResults);
// verify all controller services are enabled
for (Map.Entry<String, ControllerServiceConfiguration> service : getControllerServices().entrySet()) {
if (!service.getValue().isEnabled()) {
results.add(new ValidationResult.Builder()
.explanation("Controller service " + service.getKey() + " for " + this.getName() + " is not enabled")
.valid(false)
.build());
}
}
return results;
}
protected final Collection<ValidationResult> validateReferencedControllerServices(final ValidationContext validationContext) {
final List<PropertyDescriptor> supportedDescriptors = component.getPropertyDescriptors();
if (supportedDescriptors == null) {
return Collections.emptyList();
}
final Collection<ValidationResult> validationResults = new ArrayList<>();
for (final PropertyDescriptor descriptor : supportedDescriptors) {
if (descriptor.getControllerServiceDefinition() == null) {
// skip properties that aren't for a controller service
continue;
}
final String controllerServiceId = validationContext.getProperty(descriptor).getValue();
if (controllerServiceId == null) {
continue;
}
final ControllerService controllerService = getControllerService(controllerServiceId);
if (controllerService == null) {
final ValidationResult result = new ValidationResult.Builder()
.valid(false)
.subject(descriptor.getDisplayName())
.input(controllerServiceId)
.explanation("Invalid Controller Service: " + controllerServiceId + " is not a valid Controller Service Identifier")
.build();
validationResults.add(result);
continue;
}
final Class<? extends ControllerService> requiredServiceClass = descriptor.getControllerServiceDefinition();
if (!requiredServiceClass.isAssignableFrom(controllerService.getClass())) {
final ValidationResult result = new ValidationResult.Builder()
.valid(false)
.subject(descriptor.getDisplayName())
.input(controllerServiceId)
.explanation("Invalid Controller Service: " + controllerServiceId + " does not implement interface " + requiredServiceClass)
.build();
validationResults.add(result);
continue;
}
final boolean enabled = isControllerServiceEnabled(controllerServiceId);
if (!enabled) {
validationResults.add(new ValidationResult.Builder()
.input(controllerServiceId)
.subject(descriptor.getDisplayName())
.explanation("Controller Service with ID " + controllerServiceId + " is not enabled")
.valid(false)
.build());
}
}
return validationResults;
}
public boolean isValid() {
for (final ValidationResult result : validate()) {
if (!result.isValid()) {
return false;
}
}
return true;
}
public void assertValid() {
final StringBuilder sb = new StringBuilder();
int failureCount = 0;
for (final ValidationResult result : validate()) {
if (!result.isValid()) {
sb.append(result.toString()).append("\n");
failureCount++;
}
}
if (failureCount > 0) {
Assertions.fail("Processor has " + failureCount + " validation failures:\n" + sb.toString());
}
}
@Override
public String encrypt(final String unencrypted) {
return "enc{" + unencrypted + "}";
}
@Override
public String decrypt(final String encrypted) {
if (encrypted.startsWith("enc{") && encrypted.endsWith("}")) {
return encrypted.substring(4, encrypted.length() - 1);
}
return encrypted;
}
public void setValidateExpressionUsage(final boolean validate) {
allowExpressionValidation = validate;
}
public void enableExpressionValidation() {
enableExpressionValidation = true;
}
public void disableExpressionValidation() {
enableExpressionValidation = false;
}
Map<PropertyDescriptor, String> getControllerServiceProperties(final ControllerService controllerService) {
return super.getConfiguration(controllerService.getIdentifier()).getProperties();
}
String getControllerServiceAnnotationData(final ControllerService controllerService) {
return super.getConfiguration(controllerService.getIdentifier()).getAnnotationData();
}
@Override
public ControllerServiceLookup getControllerServiceLookup() {
return this;
}
@Override
public Set<Relationship> getAvailableRelationships() {
if (!(component instanceof Processor)) {
return Collections.emptySet();
}
final Set<Relationship> relationships = new HashSet<>(((Processor) component).getRelationships());
relationships.removeAll(unavailableRelationships);
return relationships;
}
public void setUnavailableRelationships(final Set<Relationship> relationships) {
this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships));
}
public Set<Relationship> getUnavailableRelationships() {
return unavailableRelationships;
}
@Override
public boolean hasIncomingConnection() {
return incomingConnection;
}
public void setIncomingConnection(final boolean hasIncomingConnection) {
this.incomingConnection = hasIncomingConnection;
}
@Override
public boolean hasConnection(Relationship relationship) {
return this.connections.contains(relationship);
}
public void setNonLoopConnection(final boolean hasNonLoopConnection) {
this.nonLoopConnection = hasNonLoopConnection;
}
@Override
public boolean hasNonLoopConnection() {
return nonLoopConnection;
}
public void addConnection(final Relationship relationship) {
this.connections.add(relationship);
}
public void removeConnection(final Relationship relationship) {
this.connections.remove(relationship);
}
public void setConnections(final Set<Relationship> connections) {
if (connections == null) {
this.connections = Collections.emptySet();
} else {
this.connections = Collections.unmodifiableSet(connections);
}
}
@Override
public boolean isExpressionLanguagePresent(final PropertyDescriptor property) {
if (property == null || !property.isExpressionLanguageSupported()) {
return false;
}
final List<Range> elRanges = Query.extractExpressionRanges(getProperty(property).getValue());
return (elRanges != null && !elRanges.isEmpty());
}
@Override
public StateManager getStateManager() {
return stateManager;
}
@Override
public String getName() {
return componentName;
}
protected void setMaxConcurrentTasks(int maxConcurrentTasks) {
this.maxConcurrentTasks = maxConcurrentTasks;
}
@Override
public boolean isClustered() {
return isClustered;
}
@Override
public boolean isConfiguredForClustering() {
return isConfiguredForClustering;
}
@Override
public boolean isPrimary() {
return isPrimaryNode;
}
public void setClustered(boolean clustered) {
isClustered = clustered;
}
public void setIsConfiguredForClustering(final boolean isConfiguredForClustering) {
this.isConfiguredForClustering = isConfiguredForClustering;
}
public void setPrimaryNode(boolean primaryNode) {
if (!isConfiguredForClustering && primaryNode) {
throw new IllegalArgumentException("Primary node is only available in cluster. Use setIsConfiguredForClustering(true) first.");
}
isPrimaryNode = primaryNode;
}
@Override
public InputRequirement getInputRequirement() {
return inputRequirement;
}
public void setConnected(boolean connected) {
isConnected = connected;
}
@Override
public boolean isConnectedToCluster() {
return isConnected;
}
}