/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.util;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.junit.jupiter.api.Test;

public class TestStandardProcessorTestRunner {

    @Test
    public void testProcessContextPassedToOnStoppedMethods() {
        final ProcessorWithOnStop proc = new ProcessorWithOnStop();
        final TestRunner runner = TestRunners.newTestRunner(proc);

        assertEquals(0, proc.getOnStoppedCallsWithContext());
        assertEquals(0, proc.getOnStoppedCallsWithoutContext());

        runner.run(1, false);

        assertEquals(0, proc.getOnStoppedCallsWithContext());
        assertEquals(0, proc.getOnStoppedCallsWithoutContext());

        runner.run(1, true);

        assertEquals(1, proc.getOnStoppedCallsWithContext());
        assertEquals(1, proc.getOnStoppedCallsWithoutContext());
    }

    @Test
    public void testAllConditionsMet() {
        TestRunner runner = new StandardProcessorTestRunner(new GoodProcessor());

        final Map<String, String> attributes = new HashMap<>();
        attributes.put("GROUP_ATTRIBUTE_KEY", "1");
        attributes.put("KeyB", "hihii");
        runner.enqueue("1,hello\n1,good-bye".getBytes(), attributes);

        runner.run();
        runner.assertAllFlowFilesTransferred(GoodProcessor.REL_SUCCESS, 1);

        runner.assertAllConditionsMet("success",
                mff -> mff.isAttributeEqual("GROUP_ATTRIBUTE_KEY", "1") && mff.isContentEqual("1,hello\n1,good-bye")
        );
    }

    @Test
    public void testAllConditionsMetComplex() {
        TestRunner runner = new StandardProcessorTestRunner(new GoodProcessor());

        final Map<String, String> attributes = new HashMap<>();
        attributes.put("GROUP_ATTRIBUTE_KEY", "1");
        attributes.put("KeyB", "hihii");
        runner.enqueue("1,hello\n1,good-bye".getBytes(), attributes);

        attributes.clear();
        attributes.put("age", "34");
        runner.enqueue("May Andersson".getBytes(), attributes);

        runner.run();
        runner.assertAllFlowFilesTransferred(GoodProcessor.REL_SUCCESS, 2);

        Predicate<MockFlowFile> firstPredicate = mff -> mff.isAttributeEqual("GROUP_ATTRIBUTE_KEY", "1");
        Predicate<MockFlowFile> either = firstPredicate.or(mff -> mff.isAttributeEqual("age", "34"));

        runner.assertAllConditionsMet("success", either);
    }

    @Test
    public void testNumThreads() {
        final ProcessorWithOnStop proc = new ProcessorWithOnStop();
        final TestRunner runner = TestRunners.newTestRunner(proc);
        runner.setThreadCount(5);
        runner.run(1, true);
        assertEquals(5, runner.getProcessContext().getMaxConcurrentTasks());
    }

    @Test
    public void testFlowFileValidator() {
        final AddAttributeProcessor proc = new AddAttributeProcessor();
        final TestRunner runner = TestRunners.newTestRunner(proc);

        runner.run(5, true);
        runner.assertTransferCount(AddAttributeProcessor.REL_SUCCESS, 3);
        runner.assertTransferCount(AddAttributeProcessor.REL_FAILURE, 2);
        runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.REL_SUCCESS, AddAttributeProcessor.KEY);
        runner.assertAllFlowFiles(AddAttributeProcessor.REL_SUCCESS, new FlowFileValidator() {
            @Override
            public void assertFlowFile(FlowFile f) {
                assertEquals("value", f.getAttribute(AddAttributeProcessor.KEY));
            }
        });
    }

    @Test
    public void testFailFlowFileValidator() {
        final AddAttributeProcessor proc = new AddAttributeProcessor();
        final TestRunner runner = TestRunners.newTestRunner(proc);

        runner.run(5, true);
        assertThrows(AssertionError.class, () -> {
            runner.assertAllFlowFiles(f -> assertEquals("value", f.getAttribute(AddAttributeProcessor.KEY)));
        });
    }

    @Test
    public void testFailAllFlowFilesContainAttribute() {
        final AddAttributeProcessor proc = new AddAttributeProcessor();
        final TestRunner runner = TestRunners.newTestRunner(proc);

        runner.run(5, true);
        assertThrows(AssertionError.class, () -> runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.KEY));
    }

    @Test
    public void testAllFlowFilesContainAttribute() {
        final AddAttributeProcessor proc = new AddAttributeProcessor();
        final TestRunner runner = TestRunners.newTestRunner(proc);

        runner.run(1, true);
        runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.KEY);
    }

    @Test
    public void testVariables() {
        final AddAttributeProcessor proc = new AddAttributeProcessor();
        final TestRunner runner = TestRunners.newTestRunner(proc);
        assertNull(runner.getVariableValue("hello"));

        runner.setVariable("hello", "world");
        assertEquals("world", runner.getVariableValue("hello"));

        assertEquals("world", runner.removeVariable("hello"));
        assertNull(runner.getVariableValue("hello"));
    }

    @Test
    public void testControllerServiceUpdateShouldCallOnSetProperty() {
        // Arrange
        final ControllerService testService = new SimpleTestService();
        final AddAttributeProcessor proc = new AddAttributeProcessor();
        final TestRunner runner = TestRunners.newTestRunner(proc);
        final String serviceIdentifier = "test";
        final String pdName = "name";
        final String pdValue = "exampleName";
        try {
            runner.addControllerService(serviceIdentifier, testService);
        } catch (InitializationException e) {
            fail(e.getMessage());
        }

        assertFalse("onPropertyModified has been called", ((SimpleTestService) testService).isOpmCalled());

        // Act
        ValidationResult vr = runner.setProperty(testService, pdName, pdValue);

        // Assert
        assertTrue(vr.isValid());

        ControllerServiceConfiguration csConf = ((MockProcessContext) runner.getProcessContext()).getConfiguration(serviceIdentifier);
        PropertyDescriptor propertyDescriptor = testService.getPropertyDescriptor(pdName);
        String retrievedPDValue = csConf.getProperties().get(propertyDescriptor);

        assertEquals(pdValue, retrievedPDValue);
        assertTrue("onPropertyModified has not been called", ((SimpleTestService) testService).isOpmCalled());
    }

    @Test
    public void testProcessorNameShouldBeSet() {
        final AddAttributeProcessor proc = new AddAttributeProcessor();
        final TestRunner runner = TestRunners.newTestRunner(proc, "TestName");
        assertEquals("TestName",runner.getProcessContext().getName());
    }

    @Test
    public void testProcessorInvalidWhenControllerServiceDisabled() {
        final ControllerService testService = new RequiredPropertyTestService();
        final AddAttributeProcessor proc = new AddAttributeProcessor();
        final TestRunner runner = TestRunners.newTestRunner(proc);
        final String serviceIdentifier = "test";
        final String pdName = "name";
        final String pdValue = "exampleName";
        try {
            runner.addControllerService(serviceIdentifier, testService);
        } catch (InitializationException e) {
            fail(e.getMessage());
        }

        // controller service invalid due to no value on required property; processor must also be invalid
        runner.assertNotValid(testService);
        runner.assertNotValid();

        // add required property; controller service valid but not enabled; processor must be invalid
        runner.setProperty(testService, RequiredPropertyTestService.namePropertyDescriptor, pdValue);
        runner.assertValid(testService);
        runner.assertNotValid();

        // enable controller service; processor now valid
        runner.enableControllerService(testService);
        runner.assertValid(testService);
        runner.assertValid();
    }

    private static class ProcessorWithOnStop extends AbstractProcessor {

        private int callsWithContext = 0;
        private int callsWithoutContext = 0;

        @OnStopped
        public void onStoppedWithContext(final ProcessContext procContext) {
            callsWithContext++;
        }

        @OnStopped
        public void onStoppedWithoutContext() {
            callsWithoutContext++;
        }

        public int getOnStoppedCallsWithContext() {
            return callsWithContext;
        }

        public int getOnStoppedCallsWithoutContext() {
            return callsWithoutContext;
        }

        @Override
        public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        }

    }

    private static class AddAttributeProcessor extends AbstractProcessor {
        public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("success").build();
        public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("failure").build();
        public static final String KEY = "KEY";

        private Set<Relationship> relationships;
        private int counter = 0;

        @Override
        protected void init(final ProcessorInitializationContext context) {
            final Set<Relationship> relationships = new HashSet<>();
            relationships.add(REL_SUCCESS);
            relationships.add(REL_FAILURE);
            this.relationships = Collections.unmodifiableSet(relationships);
        }

        @Override
        public Set<Relationship> getRelationships() {
            return relationships;
        }

        @Override
        public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
            FlowFile ff = session.create();
            if (counter % 2 == 0) {
                ff = session.putAttribute(ff, KEY, "value");
                session.transfer(ff, REL_SUCCESS);
            } else {
                session.transfer(ff, REL_FAILURE);
            }
            counter++;
        }
    }

    private static class GoodProcessor extends AbstractProcessor {

        public static final Relationship REL_SUCCESS = new Relationship.Builder()
                .name("success")
                .description("Successfully created FlowFile from ...")
                .build();

        public static final Relationship REL_FAILURE = new Relationship.Builder()
                .name("failure")
                .description("... execution failed. Incoming FlowFile will be penalized and routed to this relationship")
                .build();

        private final Set<Relationship> relationships;

        public GoodProcessor() {
            final Set<Relationship> r = new HashSet<>();
            r.add(REL_SUCCESS);
            r.add(REL_FAILURE);
            relationships = Collections.unmodifiableSet(r);
        }

        @Override
        public Set<Relationship> getRelationships() {
            return relationships;
        }

        @Override
        public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {

            for (FlowFile incoming : session.get(20)) {
                session.transfer(incoming, REL_SUCCESS);
            }
        }
    }

    private static class SimpleTestService extends AbstractControllerService {
        private final String PD_NAME = "name";
        private PropertyDescriptor namePropertyDescriptor = new PropertyDescriptor.Builder()
                .name(PD_NAME)
                .displayName("Controller Service Name")
                .required(false)
                .sensitive(false)
                .allowableValues("exampleName", "anotherExampleName")
                .build();

        private boolean opmCalled = false;

        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
            return Arrays.asList(namePropertyDescriptor);
        }

        public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
            getLogger().info("onPropertyModified called for PD {} with old value {} and new value {}", new Object[]{descriptor.getName(), oldValue, newValue});
            opmCalled = true;
        }

        public boolean isOpmCalled() {
            return opmCalled;
        }
    }

    private static class RequiredPropertyTestService extends AbstractControllerService {
        private static final String PD_NAME = "name";
        protected static final  PropertyDescriptor namePropertyDescriptor = new PropertyDescriptor.Builder()
                .name(PD_NAME)
                .displayName("Controller Service Name")
                .required(true)
                .sensitive(false)
                .allowableValues("exampleName", "anotherExampleName")
                .build();

        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
            return Arrays.asList(namePropertyDescriptor);
        }
    }

    @Test
    public void testErrorLogMessageArguments() {

        String compName = "name of component";
        final MockComponentLog logger = new MockComponentLog("first id",compName);

        Throwable t = new ArithmeticException();
        logger.error("expected test error",t);

        String expected_throwable = "java.lang.ArithmeticException";

        List<LogMessage>  log = logger.getErrorMessages();
        LogMessage msg = log.get(0);
        // checking if the error messages are recorded in the correct throwable argument.
        assertEquals(expected_throwable,msg.getThrowable().toString());
        assertEquals("{} expected test error",msg.getMsg());

    }
}
