/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.util;

import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.state.MockStateManager;
import org.junit.jupiter.api.Assertions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static java.util.Objects.requireNonNull;

public class MockProcessContext extends MockControllerServiceLookup implements ProcessContext, ControllerServiceLookup, NodeTypeProvider {

    private final ConfigurableComponent component;
    private final String componentName;
    private final Map<PropertyDescriptor, String> properties = new HashMap<>();
    private final StateManager stateManager;
    private final VariableRegistry variableRegistry;

    private String annotationData = null;
    private boolean yieldCalled = false;
    private boolean enableExpressionValidation = false;
    private boolean allowExpressionValidation = true;
    private volatile boolean incomingConnection = true;
    private volatile boolean nonLoopConnection = true;
    private volatile InputRequirement inputRequirement = null;
    private int maxConcurrentTasks = 1;

    private volatile Set<Relationship> connections = new HashSet<>();
    private volatile Set<Relationship> unavailableRelationships = new HashSet<>();

    private volatile boolean isClustered;
    private volatile boolean isConfiguredForClustering;
    private volatile boolean isPrimaryNode;
    private volatile boolean isConnected = true;

    public MockProcessContext(final ConfigurableComponent component) {
        this(component, null);
    }

    public MockProcessContext(final ConfigurableComponent component, final String componentName) {
        this(component, componentName, new MockStateManager(component), VariableRegistry.EMPTY_REGISTRY);
    }

    /**
     * Creates a new MockProcessContext for the given Processor
     *
     * @param component being mocked
     * @param stateManager state manager
     * @param variableRegistry variableRegistry
     */
    public MockProcessContext(final ConfigurableComponent component, final StateManager stateManager, final VariableRegistry variableRegistry) {
        this(component,null,stateManager,variableRegistry);
    }

    public MockProcessContext(final ControllerService component,
                              final MockProcessContext context,
                              final StateManager stateManager,
                              final VariableRegistry variableRegistry) {
        this(component, null, context, stateManager, variableRegistry);
    }

    public MockProcessContext(final ControllerService component,
                              final String componentName,
                              final MockProcessContext context,
                              final StateManager stateManager,
                              final VariableRegistry variableRegistry) {
        this(component, componentName, stateManager, variableRegistry);

        try {
            annotationData = context.getControllerServiceAnnotationData(component);

            final Map<PropertyDescriptor, String> props = context.getControllerServiceProperties(component);
            properties.putAll(props);

            super.addControllerServices(context);
        } catch (IllegalArgumentException e) {
            // do nothing...the service is being loaded
        }
    }

    /**
     * Creates a new MockProcessContext for the given Processor with given name
     *
     * @param component being mocked
     * @param componentName the name to be given the component;
     * @param stateManager state manager
     * @param variableRegistry variableRegistry
     */
    public MockProcessContext(final ConfigurableComponent component,
                              final String componentName,
                              final StateManager stateManager,
                              final VariableRegistry variableRegistry) {
        this.component = Objects.requireNonNull(component);
        this.componentName = componentName == null ? "" : componentName;
        this.inputRequirement = component.getClass().getAnnotation(InputRequirement.class);
        this.stateManager = stateManager;
        this.variableRegistry = variableRegistry;
    }



    @Override
    public PropertyValue getProperty(final PropertyDescriptor descriptor) {
        return getProperty(descriptor.getName());
    }

    public PropertyValue getPropertyWithoutValidatingExpressions(final PropertyDescriptor propertyDescriptor) {
        final PropertyDescriptor canonicalDescriptor = component.getPropertyDescriptor(propertyDescriptor.getName());
        if (canonicalDescriptor == null) {
            return null;
        }

        final String setPropertyValue = properties.get(canonicalDescriptor);
        final String propValue = (setPropertyValue == null) ? canonicalDescriptor.getDefaultValue() : setPropertyValue;

        final MockPropertyValue propertyValue = new MockPropertyValue(propValue, this, canonicalDescriptor, true, variableRegistry);
        return propertyValue;
    }

    @Override
    public PropertyValue getProperty(final String propertyName) {
        final PropertyDescriptor descriptor = component.getPropertyDescriptor(propertyName);
        if (descriptor == null) {
            return null;
        }

        final String setPropertyValue = properties.get(descriptor);
        final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue;

        final boolean alreadyEvaluated = !this.allowExpressionValidation;
        final MockPropertyValue propertyValue = new MockPropertyValue(propValue, this, descriptor, alreadyEvaluated, variableRegistry);
        return propertyValue;
    }

    @Override
    public PropertyValue newPropertyValue(final String rawValue) {
        return new MockPropertyValue(rawValue, this, variableRegistry);
    }

    public ValidationResult setProperty(final String propertyName, final String propertyValue) {
        return setProperty(new PropertyDescriptor.Builder().name(propertyName).build(), propertyValue);
    }

    public PropertyDescriptor getPropertyDescriptor(final String propertyName) {
        return component.getPropertyDescriptor(propertyName);
    }

    /**
     * Updates the value of the property with the given PropertyDescriptor to
     * the specified value IF and ONLY IF the value is valid according to the
     * descriptor's validator. Otherwise, the property value is not updated. In
     * either case, the ValidationResult is returned, indicating whether or not
     * the property is valid
     *
     * @param descriptor of property to modify
     * @param value new value
     * @return result
     */
    public ValidationResult setProperty(final PropertyDescriptor descriptor, final String value) {
        requireNonNull(descriptor);
        requireNonNull(value, "Cannot set property to null value; if the intent is to remove the property, call removeProperty instead");
        final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(descriptor.getName());

        final ValidationResult result = fullyPopulatedDescriptor.validate(value, new MockValidationContext(this, stateManager, variableRegistry));
        String oldValue = properties.put(fullyPopulatedDescriptor, value);
        if (oldValue == null) {
            oldValue = fullyPopulatedDescriptor.getDefaultValue();
        }

        if ((value == null && oldValue != null) || (value != null && !value.equals(oldValue))) {
            component.onPropertyModified(fullyPopulatedDescriptor, oldValue, value);
        }

        return result;
    }

    public boolean removeProperty(final PropertyDescriptor descriptor) {
        Objects.requireNonNull(descriptor);
        return removeProperty(descriptor.getName());
    }

    public boolean removeProperty(final String property) {
        Objects.requireNonNull(property);
        final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(property);
        String value = null;

        if ((value = properties.remove(fullyPopulatedDescriptor)) != null) {
            if (!value.equals(fullyPopulatedDescriptor.getDefaultValue())) {
                component.onPropertyModified(fullyPopulatedDescriptor, value, null);
            }

            return true;
        }
        return false;
    }

    public void clearProperties() {
        Map<PropertyDescriptor, String> properties = getProperties();
        for (Map.Entry<PropertyDescriptor, String> e : properties.entrySet()) {
            removeProperty(e.getKey());
        }
    }

    @Override
    public void yield() {
        yieldCalled = true;
    }

    public boolean isYieldCalled() {
        return yieldCalled;
    }

    public void addControllerService(final String serviceIdentifier, final ControllerService controllerService, final Map<PropertyDescriptor, String> properties, final String annotationData) {
        requireNonNull(controllerService);
        final ControllerServiceConfiguration config = addControllerService(controllerService);
        config.setProperties(properties);
        config.setAnnotationData(annotationData);
    }

    @Override
    public int getMaxConcurrentTasks() {
        return maxConcurrentTasks;
    }

    @Override
    public ExecutionNode getExecutionNode() {
        return ExecutionNode.ALL;
    }

    public void setAnnotationData(final String annotationData) {
        this.annotationData = annotationData;
    }

    @Override
    public String getAnnotationData() {
        return annotationData;
    }

    @Override
    public Map<PropertyDescriptor, String> getProperties() {
        final List<PropertyDescriptor> supported = component.getPropertyDescriptors();
        if (supported == null || supported.isEmpty()) {
            return Collections.unmodifiableMap(properties);
        } else {
            final Map<PropertyDescriptor, String> props = new LinkedHashMap<>();
            for (final PropertyDescriptor descriptor : supported) {
                props.put(descriptor, null);
            }
            props.putAll(properties);
            return props;
        }
    }

    @Override
    public Map<String, String> getAllProperties() {
        final Map<String,String> propValueMap = new LinkedHashMap<>();
        for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
            propValueMap.put(entry.getKey().getName(), entry.getValue());
        }
        return propValueMap;
    }

    /**
     * Validates the current properties, returning ValidationResults for any
     * invalid properties. All processor defined properties will be validated.
     * If they are not included in the in the purposed configuration, the
     * default value will be used.
     *
     * @return Collection of validation result objects for any invalid findings
     * only. If the collection is empty then the processor is valid. Guaranteed
     * non-null
     */
    public Collection<ValidationResult> validate() {
        final List<ValidationResult> results = new ArrayList<>();
        final ValidationContext validationContext = new MockValidationContext(this, stateManager, variableRegistry);
        final Collection<ValidationResult> componentResults = component.validate(validationContext);
        results.addAll(componentResults);

        final Collection<ValidationResult> serviceResults = validateReferencedControllerServices(validationContext);
        results.addAll(serviceResults);

        // verify all controller services are enabled
        for (Map.Entry<String, ControllerServiceConfiguration> service : getControllerServices().entrySet()) {
            if (!service.getValue().isEnabled()) {
                results.add(new ValidationResult.Builder()
                        .explanation("Controller service " + service.getKey() + " for " + this.getName() + " is not enabled")
                        .valid(false)
                        .build());
            }
        }
        return results;
    }

    protected final Collection<ValidationResult> validateReferencedControllerServices(final ValidationContext validationContext) {
        final List<PropertyDescriptor> supportedDescriptors = component.getPropertyDescriptors();
        if (supportedDescriptors == null) {
            return Collections.emptyList();
        }

        final Collection<ValidationResult> validationResults = new ArrayList<>();
        for (final PropertyDescriptor descriptor : supportedDescriptors) {
            if (descriptor.getControllerServiceDefinition() == null) {
                // skip properties that aren't for a controller service
                continue;
            }

            final String controllerServiceId = validationContext.getProperty(descriptor).getValue();
            if (controllerServiceId == null) {
                continue;
            }

            final ControllerService controllerService = getControllerService(controllerServiceId);
            if (controllerService == null) {
                final ValidationResult result = new ValidationResult.Builder()
                    .valid(false)
                    .subject(descriptor.getDisplayName())
                    .input(controllerServiceId)
                    .explanation("Invalid Controller Service: " + controllerServiceId + " is not a valid Controller Service Identifier")
                    .build();

                validationResults.add(result);
                continue;
            }

            final Class<? extends ControllerService> requiredServiceClass = descriptor.getControllerServiceDefinition();
            if (!requiredServiceClass.isAssignableFrom(controllerService.getClass())) {
                final ValidationResult result = new ValidationResult.Builder()
                    .valid(false)
                    .subject(descriptor.getDisplayName())
                    .input(controllerServiceId)
                    .explanation("Invalid Controller Service: " + controllerServiceId + " does not implement interface " + requiredServiceClass)
                    .build();

                validationResults.add(result);
                continue;
            }

            final boolean enabled = isControllerServiceEnabled(controllerServiceId);
            if (!enabled) {
                validationResults.add(new ValidationResult.Builder()
                    .input(controllerServiceId)
                    .subject(descriptor.getDisplayName())
                    .explanation("Controller Service with ID " + controllerServiceId + " is not enabled")
                    .valid(false)
                    .build());
            }
        }

        return validationResults;
    }

    public boolean isValid() {
        for (final ValidationResult result : validate()) {
            if (!result.isValid()) {
                return false;
            }
        }

        return true;
    }

    public void assertValid() {
        final StringBuilder sb = new StringBuilder();
        int failureCount = 0;

        for (final ValidationResult result : validate()) {
            if (!result.isValid()) {
                sb.append(result.toString()).append("\n");
                failureCount++;
            }
        }

        if (failureCount > 0) {
            Assertions.fail("Processor has " + failureCount + " validation failures:\n" + sb.toString());
        }
    }

    @Override
    public String encrypt(final String unencrypted) {
        return "enc{" + unencrypted + "}";
    }

    @Override
    public String decrypt(final String encrypted) {
        if (encrypted.startsWith("enc{") && encrypted.endsWith("}")) {
            return encrypted.substring(4, encrypted.length() - 1);
        }
        return encrypted;
    }

    public void setValidateExpressionUsage(final boolean validate) {
        allowExpressionValidation = validate;
    }

    public void enableExpressionValidation() {
        enableExpressionValidation = true;
    }

    public void disableExpressionValidation() {
        enableExpressionValidation = false;
    }

    Map<PropertyDescriptor, String> getControllerServiceProperties(final ControllerService controllerService) {
        return super.getConfiguration(controllerService.getIdentifier()).getProperties();
    }

    String getControllerServiceAnnotationData(final ControllerService controllerService) {
        return super.getConfiguration(controllerService.getIdentifier()).getAnnotationData();
    }

    @Override
    public ControllerServiceLookup getControllerServiceLookup() {
        return this;
    }

    @Override
    public Set<Relationship> getAvailableRelationships() {
        if (!(component instanceof Processor)) {
            return Collections.emptySet();
        }

        final Set<Relationship> relationships = new HashSet<>(((Processor) component).getRelationships());
        relationships.removeAll(unavailableRelationships);
        return relationships;
    }

    public void setUnavailableRelationships(final Set<Relationship> relationships) {
        this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships));
    }

    public Set<Relationship> getUnavailableRelationships() {
        return unavailableRelationships;
    }

    @Override
    public boolean hasIncomingConnection() {
        return incomingConnection;
    }

    public void setIncomingConnection(final boolean hasIncomingConnection) {
        this.incomingConnection = hasIncomingConnection;
    }

    @Override
    public boolean hasConnection(Relationship relationship) {
        return this.connections.contains(relationship);
    }

    public void setNonLoopConnection(final boolean hasNonLoopConnection) {
        this.nonLoopConnection = hasNonLoopConnection;
    }

    @Override
    public boolean hasNonLoopConnection() {
        return nonLoopConnection;
    }

    public void addConnection(final Relationship relationship) {
        this.connections.add(relationship);
    }

    public void removeConnection(final Relationship relationship) {
        this.connections.remove(relationship);
    }

    public void setConnections(final Set<Relationship> connections) {
        if (connections == null) {
            this.connections = Collections.emptySet();
        } else {
            this.connections = Collections.unmodifiableSet(connections);
        }
    }

    @Override
    public boolean isExpressionLanguagePresent(final PropertyDescriptor property) {
        if (property == null || !property.isExpressionLanguageSupported()) {
            return false;
        }

        final List<Range> elRanges = Query.extractExpressionRanges(getProperty(property).getValue());
        return (elRanges != null && !elRanges.isEmpty());
    }

    @Override
    public StateManager getStateManager() {
        return stateManager;
    }

    @Override
    public String getName() {
        return componentName;
    }

    protected void setMaxConcurrentTasks(int maxConcurrentTasks) {
        this.maxConcurrentTasks = maxConcurrentTasks;
    }

    @Override
    public boolean isClustered() {
        return isClustered;
    }

    @Override
    public boolean isConfiguredForClustering() {
        return isConfiguredForClustering;
    }

    @Override
    public boolean isPrimary() {
        return isPrimaryNode;
    }

    public void setClustered(boolean clustered) {
        isClustered = clustered;
    }

    public void setIsConfiguredForClustering(final boolean isConfiguredForClustering) {
        this.isConfiguredForClustering = isConfiguredForClustering;
    }

    public void setPrimaryNode(boolean primaryNode) {
        if (!isConfiguredForClustering && primaryNode) {
            throw new IllegalArgumentException("Primary node is only available in cluster. Use setIsConfiguredForClustering(true) first.");
        }
        isPrimaryNode = primaryNode;
    }

    @Override
    public InputRequirement getInputRequirement() {
        return inputRequirement;
    }

    public void setConnected(boolean connected) {
        isConnected = connected;
    }

    @Override
    public boolean isConnectedToCluster() {
        return isConnected;
    }
}
