| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.controller.scheduling; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.nifi.admin.service.AuditService; |
| import org.apache.nifi.annotation.lifecycle.OnScheduled; |
| import org.apache.nifi.annotation.lifecycle.OnStopped; |
| import org.apache.nifi.annotation.lifecycle.OnUnscheduled; |
| import org.apache.nifi.authorization.Authorizer; |
| import org.apache.nifi.bundle.Bundle; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.components.ValidationContext; |
| import org.apache.nifi.components.ValidationResult; |
| import org.apache.nifi.components.Validator; |
| import org.apache.nifi.controller.AbstractControllerService; |
| import org.apache.nifi.controller.ControllerService; |
| import org.apache.nifi.controller.FlowController; |
| import org.apache.nifi.controller.ProcessScheduler; |
| import org.apache.nifi.controller.ProcessorNode; |
| import org.apache.nifi.controller.ScheduledState; |
| import org.apache.nifi.controller.flow.FlowManager; |
| import org.apache.nifi.controller.repository.FlowFileEventRepository; |
| import org.apache.nifi.controller.status.history.StatusHistoryRepository; |
| import org.apache.nifi.events.VolatileBulletinRepository; |
| import org.apache.nifi.groups.ProcessGroup; |
| import org.apache.nifi.nar.ExtensionDiscoveringManager; |
| import org.apache.nifi.nar.StandardExtensionDiscoveringManager; |
| import org.apache.nifi.nar.SystemBundle; |
| import org.apache.nifi.processor.AbstractProcessor; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.provenance.MockProvenanceRepository; |
| import org.apache.nifi.registry.flow.FlowRegistryClient; |
| import org.apache.nifi.registry.variable.FileBasedVariableRegistry; |
| import org.apache.nifi.util.NiFiProperties; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.LockSupport; |
| import java.util.function.Supplier; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Mockito.mock; |
| |
| /** |
| * Validate Processor's life-cycle operation within the context of |
| * {@link FlowController} and {@link StandardProcessScheduler} |
| */ |
| public class ProcessorLifecycleIT { |
| |
| private static final Logger logger = LoggerFactory.getLogger(ProcessorLifecycleIT.class); |
| private static final long SHORT_DELAY_TOLERANCE = 10000L; |
| private static final long MEDIUM_DELAY_TOLERANCE = 15000L; |
| private static final long LONG_DELAY_TOLERANCE = 20000L; |
| |
| private FlowManager flowManager; |
| private Map<String, String> properties = new HashMap<>(); |
| private volatile String propsFile = "src/test/resources/lifecycletest.nifi.properties"; |
| private ProcessScheduler processScheduler; |
| |
| @Before |
| public void before() throws Exception { |
| properties.put("P", "hello"); |
| } |
| |
| @After |
| public void after() throws Exception { |
| if (processScheduler != null) { |
| processScheduler.shutdown(); |
| processScheduler = null; |
| } |
| |
| FileUtils.deleteDirectory(new File("./target/lifecycletest")); |
| } |
| |
| private void assertCondition(final Supplier<Boolean> supplier) { |
| assertCondition(supplier, SHORT_DELAY_TOLERANCE); |
| } |
| |
| private void assertCondition(final Supplier<Boolean> supplier, final long delayToleranceMillis) { |
| final long startTime = System.currentTimeMillis(); |
| while (((System.currentTimeMillis() - startTime) < delayToleranceMillis) && !supplier.get()) { |
| try { |
| Thread.sleep(50); |
| } catch (InterruptedException ex) { |
| Thread.interrupted(); |
| break; |
| } |
| } |
| assertTrue(supplier.get()); |
| } |
| |
| @Test |
| public void validateEnableOperation() throws Exception { |
| final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(); |
| flowManager = fcsb.getFlowManager(); |
| ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString()); |
| final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), |
| UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate()); |
| |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState()); |
| // validates idempotency |
| for (int i = 0; i < 2; i++) { |
| testProcNode.enable(); |
| } |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState()); |
| testProcNode.disable(); |
| assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState()); |
| assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState()); |
| } |
| |
| @Test |
| public void validateDisableOperation() throws Exception { |
| final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(); |
| flowManager = fcsb.getFlowManager(); |
| |
| ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString()); |
| final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), |
| UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate()); |
| testProcNode.setProperties(properties); |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState()); |
| // validates idempotency |
| for (int i = 0; i < 2; i++) { |
| testProcNode.disable(); |
| } |
| assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState()); |
| assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState()); |
| |
| testProcNode.performValidation(); |
| processScheduler.startProcessor(testProcNode, true); |
| assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState()); |
| } |
| |
| /** |
| * Will validate the idempotent nature of processor start operation which |
| * can be called multiple times without any side-effects. |
| */ |
| @Test |
| public void validateIdempotencyOfProcessorStartOperation() throws Exception { |
| final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(); |
| flowManager = fcsb.getFlowManager(); |
| |
| ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString()); |
| final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), |
| fcsb.getSystemBundle().getBundleDetails().getCoordinate()); |
| testProcNode.setProperties(properties); |
| TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); |
| |
| // sets the scenario for the processor to run |
| this.noop(testProcessor); |
| |
| testProcNode.performValidation(); |
| processScheduler.startProcessor(testProcNode, true); |
| processScheduler.startProcessor(testProcNode, true); |
| processScheduler.startProcessor(testProcNode, true); |
| |
| Thread.sleep(500); |
| assertCondition(() -> testProcessor.operationNames.size() == 1); |
| assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); |
| } |
| |
| /** |
| * Validates that stop calls are harmless and idempotent if processor is not |
| * in STARTING or RUNNING state. |
| */ |
| @Test |
| public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Exception { |
| final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(); |
| flowManager = fcsb.getFlowManager(); |
| ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString()); |
| final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), |
| fcsb.getSystemBundle().getBundleDetails().getCoordinate()); |
| testProcNode.setProperties(properties); |
| TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); |
| // sets the scenario for the processor to run |
| int randomDelayLimit = 3000; |
| this.randomOnTriggerDelay(testProcessor, randomDelayLimit); |
| |
| processScheduler.stopProcessor(testProcNode); |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); |
| assertTrue(testProcessor.operationNames.isEmpty()); |
| } |
| |
| /** |
| * Validates the processors start/stop sequence where the order of |
| * operations can only be @OnScheduled, @OnUnscheduled, @OnStopped. |
| */ |
| @Test |
| @Ignore |
| public void validateSuccessfulAndOrderlyShutdown() throws Exception { |
| final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(); |
| flowManager = fcsb.getFlowManager(); |
| ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString()); |
| ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), |
| fcsb.getSystemBundle().getBundleDetails().getCoordinate()); |
| testProcNode.setProperties(properties); |
| TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); |
| |
| // sets the scenario for the processor to run |
| int randomDelayLimit = 3000; |
| this.randomOnTriggerDelay(testProcessor, randomDelayLimit); |
| |
| testProcNode.setMaxConcurrentTasks(4); |
| testProcNode.setScheduldingPeriod("500 millis"); |
| testProcNode.setAutoTerminatedRelationships(Collections.singleton(new Relationship.Builder().name("success").build())); |
| |
| testGroup.addProcessor(testProcNode); |
| |
| flowManager.getGroup(testGroup.getIdentifier()).startProcessing(); |
| assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE); |
| |
| flowManager.getRootGroup().stopProcessing(); |
| |
| Thread.sleep(randomDelayLimit); // up to randomDelayLimit, otherwise next assertion may fail as the processor still executing |
| |
| // validates that regardless of how many running tasks, lifecycle |
| // operation are invoked atomically (once each). |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE); |
| // . . . hence only 3 operations must be in the list |
| assertCondition(() -> testProcessor.operationNames.size() == 3, SHORT_DELAY_TOLERANCE); |
| // . . . and ordered as @OnScheduled, @OnUnscheduled, @OnStopped |
| assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); |
| assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); |
| assertEquals("@OnStopped", testProcessor.operationNames.get(2)); |
| } |
| |
| /** |
| * Concurrency test that is basically hammers on both stop and start |
| * operation validating their idempotency. |
| */ |
| @Test |
| @Ignore |
| public void validateLifecycleOperationOrderWithConcurrentCallsToStartStop() throws Exception { |
| final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(); |
| flowManager = fcsb.getFlowManager(); |
| |
| ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString()); |
| final ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), |
| fcsb.getSystemBundle().getBundleDetails().getCoordinate()); |
| testProcNode.setProperties(properties); |
| TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); |
| |
| // sets the scenario for the processor to run |
| this.noop(testProcessor); |
| |
| ExecutorService executor = Executors.newFixedThreadPool(100); |
| int startCallsCount = 10000; |
| final CountDownLatch countDownCounter = new CountDownLatch(startCallsCount); |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); |
| final Random random = new Random(); |
| for (int i = 0; i < startCallsCount / 2; i++) { |
| executor.execute(new Runnable() { |
| @Override |
| public void run() { |
| LockSupport.parkNanos(random.nextInt(9000000)); |
| processScheduler.stopProcessor(testProcNode); |
| countDownCounter.countDown(); |
| } |
| }); |
| } |
| for (int i = 0; i < startCallsCount / 2; i++) { |
| executor.execute(new Runnable() { |
| @Override |
| public void run() { |
| LockSupport.parkNanos(random.nextInt(9000000)); |
| processScheduler.startProcessor(testProcNode, true); |
| countDownCounter.countDown(); |
| } |
| }); |
| } |
| assertTrue(countDownCounter.await(1000000, TimeUnit.MILLISECONDS)); |
| String previousOperation = null; |
| for (String operationName : testProcessor.operationNames) { |
| if (previousOperation == null || previousOperation.equals("@OnStopped")) { |
| assertEquals("@OnScheduled", operationName); |
| } else if (previousOperation.equals("@OnScheduled")) { |
| assertEquals("@OnUnscheduled", operationName); |
| } else if (previousOperation.equals("@OnUnscheduled")) { |
| assertTrue(operationName.equals("@OnStopped") || operationName.equals("@OnScheduled")); |
| } |
| previousOperation = operationName; |
| } |
| executor.shutdownNow(); |
| } |
| |
| /** |
| * Validates that processor can be stopped before start sequence finished. |
| */ |
| @Test |
| public void validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcessorFullyStarted() throws Exception { |
| final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(); |
| flowManager = fcsb.getFlowManager(); |
| |
| ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString()); |
| ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), |
| fcsb.getSystemBundle().getBundleDetails().getCoordinate()); |
| testProcNode.setProperties(properties); |
| TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); |
| |
| // sets the scenario for the processor to run |
| int delay = 200; |
| this.longRunningOnSchedule(testProcessor, delay); |
| |
| testProcNode.performValidation(); |
| processScheduler.startProcessor(testProcNode, true); |
| assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE); |
| |
| processScheduler.stopProcessor(testProcNode); |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE); |
| assertCondition(() -> testProcessor.operationNames.size() == 3, LONG_DELAY_TOLERANCE); |
| assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); |
| assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); |
| assertEquals("@OnStopped", testProcessor.operationNames.get(2)); |
| } |
| |
| /** |
| * Validates that Processor is eventually started once invocation of |
| * |
| * @OnSchedule stopped throwing exceptions. |
| */ |
| @Test |
| public void validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledException() throws Exception { |
| final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(); |
| flowManager = fcsb.getFlowManager(); |
| |
| ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString()); |
| ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), |
| fcsb.getSystemBundle().getBundleDetails().getCoordinate()); |
| testProcNode.setProperties(properties); |
| TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); |
| |
| // sets the scenario for the processor to run |
| this.noop(testProcessor); |
| testProcessor.generateExceptionOnScheduled = true; |
| testProcessor.keepFailingOnScheduledTimes = 2; |
| |
| testProcNode.performValidation(); |
| processScheduler.startProcessor(testProcNode, true); |
| assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE); |
| processScheduler.stopProcessor(testProcNode); |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE); |
| } |
| |
| /** |
| * Validates that Processor can be stopped when @OnScheduled constantly |
| * fails. Basically validates that the re-try loop breaks if user initiated |
| * stopProcessor. |
| */ |
| @Test |
| public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() throws Exception { |
| final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(); |
| flowManager = fcsb.getFlowManager(); |
| |
| ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString()); |
| ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), |
| fcsb.getSystemBundle().getBundleDetails().getCoordinate()); |
| testProcNode.setProperties(properties); |
| TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); |
| |
| // sets the scenario for the processor to run |
| this.longRunningOnUnschedule(testProcessor, 100); |
| testProcessor.generateExceptionOnScheduled = true; |
| testProcessor.keepFailingOnScheduledTimes = Integer.MAX_VALUE; |
| |
| testProcNode.performValidation(); |
| processScheduler.startProcessor(testProcNode, true); |
| assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE); |
| processScheduler.stopProcessor(testProcNode); |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE); |
| } |
| |
| /** |
| * Validates that the Processor can be stopped when @OnScheduled blocks |
| * indefinitely but written to react to thread interrupts |
| */ |
| @Test |
| public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception { |
| final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); |
| flowManager = fcsb.getFlowManager(); |
| |
| ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString()); |
| ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), |
| fcsb.getSystemBundle().getBundleDetails().getCoordinate()); |
| testProcNode.setProperties(properties); |
| TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); |
| // sets the scenario for the processor to run |
| this.blockingInterruptableOnUnschedule(testProcessor); |
| |
| testProcNode.performValidation(); |
| processScheduler.startProcessor(testProcNode, true); |
| assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), SHORT_DELAY_TOLERANCE); |
| processScheduler.stopProcessor(testProcNode); |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE); |
| } |
| |
| /** |
| * Validates that the Processor can be stopped when @OnScheduled blocks |
| * indefinitely and written to ignore thread interrupts |
| */ |
| @Test |
| public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception { |
| final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "1 sec"); |
| flowManager = fcsb.getFlowManager(); |
| |
| ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString()); |
| ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), |
| fcsb.getSystemBundle().getBundleDetails().getCoordinate()); |
| testProcNode.setProperties(properties); |
| TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); |
| // sets the scenario for the processor to run |
| this.blockingUninterruptableOnUnschedule(testProcessor); |
| |
| testProcNode.performValidation(); |
| processScheduler.startProcessor(testProcNode, true); |
| assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE); |
| processScheduler.stopProcessor(testProcNode); |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), MEDIUM_DELAY_TOLERANCE); |
| } |
| |
| /** |
| * Validates that processor can be stopped if onTrigger() keeps throwing |
| * exceptions. |
| */ |
| @Test |
| public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() throws Exception { |
| final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest(); |
| flowManager = fcsb.getFlowManager(); |
| |
| ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString()); |
| ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), |
| fcsb.getSystemBundle().getBundleDetails().getCoordinate()); |
| testProcNode.setProperties(properties); |
| TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); |
| |
| // sets the scenario for the processor to run |
| this.noop(testProcessor); |
| testProcessor.generateExceptionOnTrigger = true; |
| |
| testProcNode.performValidation(); |
| processScheduler.startProcessor(testProcNode, true); |
| assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE); |
| processScheduler.disableProcessor(testProcNode); |
| assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE); |
| processScheduler.stopProcessor(testProcNode); |
| assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), LONG_DELAY_TOLERANCE); |
| } |
| |
| |
| /** |
| * Scenario where onTrigger() is executed with random delay limited to |
| * 'delayLimit', yet with guaranteed exit from onTrigger(). |
| */ |
| private void randomOnTriggerDelay(TestProcessor testProcessor, int delayLimit) { |
| EmptyRunnable emptyRunnable = new EmptyRunnable(); |
| RandomOrFixedDelayedRunnable delayedRunnable = new RandomOrFixedDelayedRunnable(delayLimit, true); |
| testProcessor.setScenario(emptyRunnable, emptyRunnable, emptyRunnable, delayedRunnable); |
| } |
| |
| /** |
| * Scenario where @OnSchedule is executed with delay limited to |
| * 'delayLimit'. |
| */ |
| private void longRunningOnSchedule(TestProcessor testProcessor, int delayLimit) { |
| EmptyRunnable emptyRunnable = new EmptyRunnable(); |
| RandomOrFixedDelayedRunnable delayedRunnable = new RandomOrFixedDelayedRunnable(delayLimit, false); |
| testProcessor.setScenario(delayedRunnable, emptyRunnable, emptyRunnable, emptyRunnable); |
| } |
| |
| /** |
| * Scenario where @OnUnschedule is executed with delay limited to |
| * 'delayLimit'. |
| */ |
| private void longRunningOnUnschedule(TestProcessor testProcessor, int delayLimit) { |
| EmptyRunnable emptyRunnable = new EmptyRunnable(); |
| RandomOrFixedDelayedRunnable delayedRunnable = new RandomOrFixedDelayedRunnable(delayLimit, false); |
| testProcessor.setScenario(emptyRunnable, delayedRunnable, emptyRunnable, emptyRunnable); |
| } |
| |
| /** |
| * Scenario where @OnSchedule blocks indefinitely yet interruptible. |
| */ |
| private void blockingInterruptableOnUnschedule(TestProcessor testProcessor) { |
| EmptyRunnable emptyRunnable = new EmptyRunnable(); |
| BlockingInterruptableRunnable blockingRunnable = new BlockingInterruptableRunnable(); |
| testProcessor.setScenario(blockingRunnable, emptyRunnable, emptyRunnable, emptyRunnable); |
| } |
| |
| /** |
| * Scenario where @OnSchedule blocks indefinitely and un-interruptible. |
| */ |
| private void blockingUninterruptableOnUnschedule(TestProcessor testProcessor) { |
| EmptyRunnable emptyRunnable = new EmptyRunnable(); |
| BlockingUninterruptableRunnable blockingRunnable = new BlockingUninterruptableRunnable(); |
| testProcessor.setScenario(blockingRunnable, emptyRunnable, emptyRunnable, emptyRunnable); |
| } |
| |
| /** |
| * Scenario where all tasks are no op. |
| */ |
| private void noop(TestProcessor testProcessor) { |
| EmptyRunnable emptyRunnable = new EmptyRunnable(); |
| testProcessor.setScenario(emptyRunnable, emptyRunnable, emptyRunnable, emptyRunnable); |
| } |
| |
| private FlowManagerAndSystemBundle buildFlowControllerForTest(final String propKey, final String propValue) throws Exception { |
| final Map<String, String> addProps = new HashMap<>(); |
| addProps.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec"); |
| addProps.put(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml"); |
| addProps.put(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, "local-provider"); |
| addProps.put(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, MockProvenanceRepository.class.getName()); |
| addProps.put("nifi.remote.input.socket.port", ""); |
| addProps.put("nifi.remote.input.secure", ""); |
| if (propKey != null && propValue != null) { |
| addProps.put(propKey, propValue); |
| } |
| final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, addProps); |
| |
| final Bundle systemBundle = SystemBundle.create(nifiProperties); |
| final ExtensionDiscoveringManager extensionManager = new StandardExtensionDiscoveringManager(); |
| extensionManager.discoverExtensions(systemBundle, Collections.emptySet()); |
| |
| final FlowController flowController = FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties, |
| mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(), |
| new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()), |
| mock(FlowRegistryClient.class), extensionManager, mock(StatusHistoryRepository.class)); |
| |
| final FlowManager flowManager = flowController.getFlowManager(); |
| this.processScheduler = flowController.getProcessScheduler(); |
| |
| return new FlowManagerAndSystemBundle(flowManager, systemBundle); |
| } |
| |
| private FlowManagerAndSystemBundle buildFlowControllerForTest() throws Exception { |
| return buildFlowControllerForTest(null, null); |
| } |
| |
| |
| private static class FlowManagerAndSystemBundle { |
| |
| private final FlowManager flowManager; |
| private final Bundle systemBundle; |
| |
| public FlowManagerAndSystemBundle(FlowManager flowManager, Bundle systemBundle) { |
| this.flowManager = flowManager; |
| this.systemBundle = systemBundle; |
| } |
| |
| public FlowManager getFlowManager() { |
| return flowManager; |
| } |
| |
| public Bundle getSystemBundle() { |
| return systemBundle; |
| } |
| } |
| |
| /** |
| * |
| */ |
| public static class TestProcessor extends AbstractProcessor { |
| private static final Runnable NOP = () -> { |
| }; |
| |
| private Runnable onScheduleCallback = NOP; |
| private Runnable onUnscheduleCallback = NOP; |
| private Runnable onStopCallback = NOP; |
| private Runnable onTriggerCallback = NOP; |
| |
| private boolean generateExceptionOnScheduled; |
| private boolean generateExceptionOnTrigger; |
| |
| private boolean withService; |
| |
| private int keepFailingOnScheduledTimes; |
| |
| private int onScheduledExceptionCount; |
| |
| private final List<String> operationNames = new LinkedList<>(); |
| |
| void setScenario(Runnable onScheduleCallback, Runnable onUnscheduleCallback, Runnable onStopCallback, Runnable onTriggerCallback) { |
| this.onScheduleCallback = onScheduleCallback; |
| this.onUnscheduleCallback = onUnscheduleCallback; |
| this.onStopCallback = onStopCallback; |
| this.onTriggerCallback = onTriggerCallback; |
| } |
| |
| @OnScheduled |
| public void schedule(ProcessContext ctx) { |
| this.operationNames.add("@OnScheduled"); |
| if (this.generateExceptionOnScheduled |
| && this.onScheduledExceptionCount++ < this.keepFailingOnScheduledTimes) { |
| throw new RuntimeException("Intentional"); |
| } |
| this.onScheduleCallback.run(); |
| } |
| |
| @OnUnscheduled |
| public void unschedule() { |
| this.operationNames.add("@OnUnscheduled"); |
| this.onUnscheduleCallback.run(); |
| } |
| |
| @OnStopped |
| public void stop() { |
| this.operationNames.add("@OnStopped"); |
| this.onStopCallback.run(); |
| } |
| |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| PropertyDescriptor PROP = new PropertyDescriptor.Builder() |
| .name("P") |
| .description("Blah Blah") |
| .required(true) |
| .addValidator(new Validator() { |
| @Override |
| public ValidationResult validate(final String subject, final String value, final ValidationContext context) { |
| return new ValidationResult.Builder().subject(subject).input(value).valid(value != null && !value.isEmpty()).explanation(subject + " cannot be empty").build(); |
| } |
| }) |
| .build(); |
| |
| PropertyDescriptor SERVICE = new PropertyDescriptor.Builder() |
| .name("S") |
| .description("Blah Blah") |
| .required(true) |
| .identifiesControllerService(ITestservice.class) |
| .build(); |
| |
| return this.withService ? Arrays.asList(PROP, SERVICE) : Arrays.asList(PROP); |
| } |
| |
| @Override |
| public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { |
| if (this.generateExceptionOnTrigger) { |
| throw new RuntimeException("Intentional"); |
| } |
| this.onTriggerCallback.run(); |
| } |
| } |
| |
| |
| public static class TestService extends AbstractControllerService implements ITestservice { |
| |
| } |
| |
| public static interface ITestservice extends ControllerService { |
| } |
| |
| |
| private static class EmptyRunnable implements Runnable { |
| |
| @Override |
| public void run() { |
| |
| } |
| } |
| |
| |
| private static class BlockingInterruptableRunnable implements Runnable { |
| |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(Long.MAX_VALUE); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| |
| private static class BlockingUninterruptableRunnable implements Runnable { |
| |
| @Override |
| public void run() { |
| while (true) { |
| try { |
| Thread.sleep(Long.MAX_VALUE); |
| } catch (InterruptedException e) { |
| // ignore |
| } |
| } |
| } |
| } |
| |
| |
| private static class RandomOrFixedDelayedRunnable implements Runnable { |
| |
| private final int delayLimit; |
| private final boolean randomDelay; |
| |
| public RandomOrFixedDelayedRunnable(int delayLimit, boolean randomDelay) { |
| this.delayLimit = delayLimit; |
| this.randomDelay = randomDelay; |
| } |
| |
| Random random = new Random(); |
| |
| @Override |
| public void run() { |
| try { |
| if (this.randomDelay) { |
| Thread.sleep(random.nextInt(this.delayLimit)); |
| } else { |
| Thread.sleep(this.delayLimit); |
| } |
| } catch (InterruptedException e) { |
| logger.warn("Interrupted while sleeping"); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |