MINIFICPP-1158 - Event driven processors can starve each other

Signed-off-by: Arpad Boda <aboda@apache.org>

Approved by szaszm and bakaid on GH

This closes #735
diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index 85af8c6..83935bf 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -34,7 +34,7 @@
 
 class CoapIntegrationBase : public IntegrationBase {
  public:
-  CoapIntegrationBase(uint64_t waitTime = 60000)
+  CoapIntegrationBase(uint64_t waitTime = 5000)
       : IntegrationBase(waitTime),
         server(nullptr) {
   }
diff --git a/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp b/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp
index a525404..40da9e2 100644
--- a/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp
+++ b/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp
@@ -36,37 +36,25 @@
 
 class TestHarness : public IntegrationBase {
  public:
-  TestHarness() {
-    log_entry_found = false;
-  }
-
-  void testSetup() {
+  void testSetup() override {
     LogTestController::getInstance().setTrace<minifi::FlowController>();
-	LogTestController::getInstance().setTrace<core::ProcessSession>();
-	LogTestController::getInstance().setTrace<core::ProcessContextExpr>();
+    LogTestController::getInstance().setTrace<core::ProcessSession>();
+    LogTestController::getInstance().setTrace<core::ProcessContextExpr>();
     LogTestController::getInstance().setInfo<processors::LogAttribute>();
   }
 
-  void cleanup() {
+  void cleanup() override {}
+
+  void runAssertions() override {
+    assert(LogTestController::getInstance().contains("key:route_check_attr value:good"));
+    assert(LogTestController::getInstance().contains("key:variable_attribute value:replacement_value"));
+    assert(LogTestController::getInstance().contains("ProcessSession rollback", std::chrono::seconds(1)) == false);  // No rollback happened
   }
 
-  void runAssertions() {
-    assert(log_entry_found);
-  }
-
-  void waitToVerifyProcessor() {
-    // This test takes a while to complete -> wait at most 10 secs
-    log_entry_found = LogTestController::getInstance().contains("key:route_check_attr value:good", std::chrono::seconds(10));
-    log_entry_found = LogTestController::getInstance().contains("key:variable_attribute value:replacement_value", std::chrono::seconds(10));
-  }
-
-  void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+  void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) override {
     // inject the variable into the context.
     configuration->set("nifi.variable.test", "replacement_value");
   }
-
- protected:
-  bool log_entry_found;
 };
 
 int main(int argc, char **argv) {
diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp
index afa32dc..5a8ab19 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -225,8 +225,7 @@
     }
   }
 
-  utils::ThreadPool<int> pool = utils::ThreadPool<int>(concurrent_handlers_);
-  client_thread_pool_ = std::move(pool);
+  client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
   client_thread_pool_.start();
 
   running_ = true;
diff --git a/extensions/standard-processors/processors/GetTCP.h b/extensions/standard-processors/processors/GetTCP.h
index 7b9e0a5..8796669 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -69,9 +69,9 @@
       return false;
   }
 
-  virtual int64_t wait_time() {
+  virtual std::chrono::milliseconds wait_time() {
     // wait 500ms
-    return 500;
+    return std::chrono::milliseconds(500);
   }
 
  protected:
diff --git a/libminifi/include/CronDrivenSchedulingAgent.h b/libminifi/include/CronDrivenSchedulingAgent.h
index 0943570..9abe3b5 100644
--- a/libminifi/include/CronDrivenSchedulingAgent.h
+++ b/libminifi/include/CronDrivenSchedulingAgent.h
@@ -41,16 +41,18 @@
    * Create a new event driven scheduling agent.
    */
   CronDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
-                            std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
-      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration) {
+                            std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
+                            utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
   }
   // Destructor
   virtual ~CronDrivenSchedulingAgent() {
   }
   // Run function for the thread
-  uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
+  utils::TaskRescheduleInfo run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+      const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
-  virtual void stop() {
+  void stop() override {
     std::lock_guard<std::mutex> locK(mutex_);
     schedules_.clear();
     last_exec_.clear();
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
index b434de5..295becc 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -20,6 +20,8 @@
 #ifndef __EVENT_DRIVEN_SCHEDULING_AGENT_H__
 #define __EVENT_DRIVEN_SCHEDULING_AGENT_H__
 
+#define DEFAULT_TIME_SLICE_MS 500
+
 #include "core/logging/Logger.h"
 #include "core/Processor.h"
 #include "core/ProcessContext.h"
@@ -39,14 +41,19 @@
    * Create a new event driven scheduling agent.
    */
   EventDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
-                             std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
-      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration) {
+                             std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
+                             utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
+    int slice = configuration->getInt(Configure::nifi_flow_engine_event_driven_time_slice, DEFAULT_TIME_SLICE_MS);
+    if (slice < 10 || 1000 < slice) {
+      throw Exception(FLOW_EXCEPTION, std::string(Configure::nifi_flow_engine_event_driven_time_slice) + " is out of reasonable range!");
+    }
+    time_slice_ = std::chrono::milliseconds(slice);
   }
-  // Destructor
-  virtual ~EventDrivenSchedulingAgent() {
-  }
+
   // Run function for the thread
-  uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
+  utils::TaskRescheduleInfo run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+      const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
  private:
   // Prevent default copy constructor and assignment operation
@@ -54,6 +61,8 @@
   EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent);
   EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent &parent);
 
+  std::chrono::milliseconds time_slice_;
+
 };
 
 } /* namespace minifi */
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 2e40bb9..5602452 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -63,9 +63,6 @@
  */
 class FlowController : public core::controller::ControllerServiceProvider, public state::StateManager {
  public:
-  static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10;
-  static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5;
-
   /**
    * Flow controller constructor
    */
@@ -86,22 +83,6 @@
   // Destructor
   virtual ~FlowController();
 
-  // Set MAX TimerDrivenThreads
-  virtual void setMaxTimerDrivenThreads(int number) {
-    max_timer_driven_threads_ = number;
-  }
-  // Get MAX TimerDrivenThreads
-  virtual int getMaxTimerDrivenThreads() {
-    return max_timer_driven_threads_;
-  }
-  // Set MAX EventDrivenThreads
-  virtual void setMaxEventDrivenThreads(int number) {
-    max_event_driven_threads_ = number;
-  }
-  // Get MAX EventDrivenThreads
-  virtual int getMaxEventDrivenThreads() {
-    return max_event_driven_threads_;
-  }
   // Get the provenance repository
   virtual std::shared_ptr<core::Repository> getProvenanceRepository() {
     return this->provenance_repo_;
@@ -222,7 +203,7 @@
    * Enables the controller service services
    * @param serviceNode service node which will be disabled, along with linked services.
    */
-  virtual std::future<uint64_t> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+  virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
 
   /**
    * Enables controller services
@@ -234,7 +215,7 @@
    * Disables controller services
    * @param serviceNode service node which will be disabled, along with linked services.
    */
-  virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+  virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
 
   /**
    * Gets all controller services.
@@ -355,11 +336,6 @@
   std::string properties_file_name_;
   // Root Process Group
   std::shared_ptr<core::ProcessGroup> root_;
-  // MAX Timer Driven Threads
-  int max_timer_driven_threads_;
-  // MAX Event Driven Threads
-  int max_event_driven_threads_;
-  // FlowFile Repo
   // Whether it is running
   std::atomic<bool> running_;
   std::atomic<bool> updating_;
@@ -380,6 +356,8 @@
 
   std::shared_ptr<core::ContentRepository> content_repo_;
 
+  // Thread pool for schedulers
+  utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_;
   // Flow Engines
   // Flow Timer Scheduler
   std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 6993302..13d7ded 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -28,6 +28,7 @@
 #include <algorithm>
 #include <thread>
 #include "utils/CallBackTimer.h"
+#include "utils/Monitors.h"
 #include "utils/TimeUtil.h"
 #include "utils/ThreadPool.h"
 #include "utils/BackTrace.h"
@@ -49,67 +50,6 @@
 namespace nifi {
 namespace minifi {
 
-/**
- * Uses the wait time for a given worker to determine if it is eligible to run
- */
-class TimerAwareMonitor : public utils::AfterExecute<uint64_t> {
- public:
-  TimerAwareMonitor(std::atomic<bool> *run_monitor)
-      : current_wait_(0),
-        run_monitor_(run_monitor) {
-  }
-  explicit TimerAwareMonitor(TimerAwareMonitor &&other)
-      : AfterExecute(std::move(other)),
-        run_monitor_(std::move(other.run_monitor_)) {
-    current_wait_.store(other.current_wait_.load());
-  }
-  virtual bool isFinished(const uint64_t &result) {
-    current_wait_.store(result);
-    if (*run_monitor_) {
-      return false;
-    }
-    return true;
-  }
-  virtual bool isCancelled(const uint64_t &result) {
-    if (*run_monitor_) {
-      return false;
-    }
-    return true;
-  }
-  /**
-   * Time to wait before re-running this task if necessary
-   * @return milliseconds since epoch after which we are eligible to re-run this task.
-   */
-  virtual int64_t wait_time() {
-    return current_wait_.load();
-  }
- protected:
-
-  std::atomic<uint64_t> current_wait_;
-  std::atomic<bool> *run_monitor_;
-};
-
-class SingleRunMonitor : public TimerAwareMonitor {
- public:
-  SingleRunMonitor(std::atomic<bool> *run_monitor)
-      : TimerAwareMonitor(run_monitor) {
-  }
-  explicit SingleRunMonitor(TimerAwareMonitor &&other)
-      : TimerAwareMonitor(std::move(other)) {
-  }
-  virtual bool isFinished(const uint64_t &result) {
-    if (result == 0) {
-      return true;
-    } else {
-      current_wait_.store(result);
-      if (*run_monitor_) {
-        return false;
-      }
-      return true;
-    }
-  }
-};
-
 // SchedulingAgent Class
 class SchedulingAgent {
  public:
@@ -118,25 +58,18 @@
    * Create a new scheduling agent.
    */
   SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo,
-                  std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
-      : admin_yield_duration_(0),
+                  std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+      : admin_yield_duration_(),
         bored_yield_duration_(0),
         configure_(configuration),
         content_repo_(content_repo),
+        thread_pool_(thread_pool),
         controller_service_provider_(controller_service_provider),
         logger_(logging::LoggerFactory<SchedulingAgent>::getLogger()),
         alert_time_(configuration->getInt(Configure::nifi_flow_engine_alert_period, SCHEDULING_WATCHDOG_DEFAULT_ALERT_PERIOD_MS)) {
     running_ = false;
     repo_ = repo;
     flow_repo_ = flow_repo;
-    /**
-     * To facilitate traces we cannot use daemon threads -- this could potentially cause blocking on I/O; however, it's a better path
-     * to be able to debug why an agent doesn't work and still allow a restart via updates in these cases.
-     */
-    auto csThreads = configure_->getInt(Configure::nifi_flow_engine_threads, 2);
-    auto pool = utils::ThreadPool<uint64_t>(csThreads, false, controller_service_provider, "SchedulingAgent");
-    thread_pool_ = std::move(pool);
-    thread_pool_.start();
 
     if (alert_time_ > std::chrono::milliseconds(0)) {
       std::function<void(void)> f = std::bind(&SchedulingAgent::watchDogFunc, this);
@@ -166,7 +99,6 @@
   // stop
   virtual void stop() {
     running_ = false;
-    thread_pool_.shutdown();
   }
 
   std::vector<BackTrace> getTraces() {
@@ -175,8 +107,8 @@
 
   void watchDogFunc();
 
-  virtual std::future<uint64_t> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
-  virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+  virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
+  virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
   // schedule, overwritten by different DrivenSchedulingAgent
   virtual void schedule(std::shared_ptr<core::Processor> processor) = 0;
   // unschedule, overwritten by different DrivenSchedulingAgent
@@ -202,7 +134,7 @@
 
   std::shared_ptr<core::ContentRepository> content_repo_;
   // thread pool for components.
-  utils::ThreadPool<uint64_t> thread_pool_;
+  utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
   // controller service provider reference
   std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_;
 
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index ba0998a..c530796 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -45,8 +45,8 @@
    * Create a new threaded scheduling agent.
    */
   ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo,
-                          std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration)
-      : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration),
+                          std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,  utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+      : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool),
         logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
   }
   // Destructor
@@ -54,7 +54,7 @@
   }
 
   // Run function for the thread
-  virtual uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+  virtual utils::TaskRescheduleInfo run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
                        const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) = 0;
 
  public:
@@ -73,6 +73,8 @@
   ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent);
   ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
   std::shared_ptr<logging::Logger> logger_;
+
+  std::set<std::string> processors_running_;  // Set just for easy usage
 };
 
 } /* namespace minifi */
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index 8398b3a..10aaf77 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -38,17 +38,17 @@
    * Create a new processor
    */
   TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
-                             std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure)
-      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure),
+                             std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
+                             utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure, thread_pool),
         logger_(logging::LoggerFactory<TimerDrivenSchedulingAgent>::getLogger()) {
   }
-  //  Destructor
-  virtual ~TimerDrivenSchedulingAgent() {
-  }
+
   /**
    * Run function that accepts the processor, context and session factory.
    */
-  uint64_t run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
+  utils::TaskRescheduleInfo run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+      const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
  private:
   // Prevent default copy constructor and assignment operation
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h
index 18ac5de..6a147e1 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -26,6 +26,7 @@
 #include "ControllerServiceNode.h"
 #include "ControllerServiceMap.h"
 #include "core/ClassLoader.h"
+#include "utils/Monitors.h"
 
 namespace org {
 namespace apache {
@@ -97,7 +98,7 @@
    * Enables the provided controller service
    * @param serviceNode controller service node.
    */
-  virtual std::future<uint64_t> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0;
+  virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0;
 
   /**
    * Enables the provided controller service nodes
@@ -109,7 +110,7 @@
    * Disables the provided controller service node
    * @param serviceNode controller service node.
    */
-  virtual std::future<uint64_t> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
+  virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
 
   /**
    * Gets a list of all controller services.
diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index cc1d51e..6ce6651 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -103,15 +103,12 @@
 
   }
 
-  std::future<uint64_t> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
+  std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
     if (serviceNode->canEnable()) {
       return agent_->enableControllerService(serviceNode);
     } else {
 
-      std::future<uint64_t> no_run = std::async(std::launch::async, []() {
-        uint64_t ret = 0;
-        return ret;
-      });
+      std::future<utils::TaskRescheduleInfo> no_run = std::async(std::launch::deferred, utils::TaskRescheduleInfo::Done);
       return no_run;
     }
   }
@@ -135,14 +132,11 @@
     }
   }
 
-  std::future<uint64_t> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
+  std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) {
     if (!IsNullOrEmpty(serviceNode.get()) && serviceNode->enabled()) {
       return agent_->disableControllerService(serviceNode);
     } else {
-      std::future<uint64_t> no_run = std::async(std::launch::async, []() {
-        uint64_t ret = 0;
-        return ret;
-      });
+      std::future<utils::TaskRescheduleInfo> no_run = std::async(std::launch::deferred, utils::TaskRescheduleInfo::Done);
       return no_run;
     }
   }
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index 4d70d99..8c76e58 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -150,14 +150,14 @@
     return !*running_;
   }
 
-  virtual int64_t wait_time() {
+  virtual std::chrono::milliseconds wait_time() {
     return delay_;
   }
  protected:
 
   std::atomic<bool> *running_;
 
-  int64_t delay_;
+  std::chrono::milliseconds delay_;
 
 };
 
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 89dbe0e..9250b5b 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -47,6 +47,7 @@
   static const char *nifi_flow_configuration_file_backup_update;
   static const char *nifi_flow_engine_threads;
   static const char *nifi_flow_engine_alert_period;
+  static const char *nifi_flow_engine_event_driven_time_slice;
   static const char *nifi_administrative_yield_duration;
   static const char *nifi_bored_yield_duration;
   static const char *nifi_graceful_shutdown_seconds;
diff --git a/libminifi/include/utils/Monitors.h b/libminifi/include/utils/Monitors.h
new file mode 100644
index 0000000..a9ff485
--- /dev/null
+++ b/libminifi/include/utils/Monitors.h
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_MONITORS_H
+#define NIFI_MINIFI_CPP_MONITORS_H
+
+#include <chrono>
+#include <atomic>
+#if defined(WIN32)
+#include <future>  // This is required to work around a VS2017 bug, see the details below
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task helper that determines
+ * whether or not we will run
+ */
+template<typename T>
+class AfterExecute {
+ public:
+  virtual ~AfterExecute() {
+
+  }
+
+  explicit AfterExecute() {
+
+  }
+
+  explicit AfterExecute(AfterExecute &&other) {
+
+  }
+  virtual bool isFinished(const T &result) = 0;
+  virtual bool isCancelled(const T &result) = 0;
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() = 0;
+};
+
+/**
+ * Uses the wait time for a given worker to determine if it is eligible to run
+ */
+class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> {
+ public:
+  TimerAwareMonitor(std::atomic<bool> *run_monitor)
+      : current_wait_(std::chrono::milliseconds(0)),
+        run_monitor_(run_monitor) {
+  }
+  virtual bool isFinished(const std::chrono::milliseconds &result) override {
+    current_wait_.store(result);
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  virtual bool isCancelled(const std::chrono::milliseconds &result) override {
+    if (*run_monitor_) {
+      return false;
+    }
+    return true;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+ protected:
+
+  std::atomic<std::chrono::milliseconds> current_wait_;
+  std::atomic<bool> *run_monitor_;
+};
+
+class SingleRunMonitor : public utils::AfterExecute<bool>{
+ public:
+  SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100))
+      : retry_interval_(retry_interval) {
+  }
+
+  virtual bool isFinished(const bool &result) override {
+    return result;
+  }
+  virtual bool isCancelled(const bool &result) override {
+    return false;
+  }
+  virtual std::chrono::milliseconds wait_time() override {
+    return retry_interval_;
+  }
+ protected:
+  const std::chrono::milliseconds retry_interval_;
+};
+
+
+struct TaskRescheduleInfo {
+  TaskRescheduleInfo(bool result, std::chrono::milliseconds wait_time)
+    : finished_(result), wait_time_(wait_time){}
+  std::chrono::milliseconds wait_time_;
+  bool finished_;
+
+  static TaskRescheduleInfo Done() {
+    return TaskRescheduleInfo(true, std::chrono::milliseconds(0));
+  }
+
+  static TaskRescheduleInfo RetryIn(std::chrono::milliseconds interval) {
+    return TaskRescheduleInfo(false, interval);
+  }
+
+  static TaskRescheduleInfo RetryImmediately() {
+    return TaskRescheduleInfo(false, std::chrono::milliseconds(0));
+  }
+
+#if defined(WIN32)
+ // https://developercommunity.visualstudio.com/content/problem/60897/c-shared-state-futuresstate-default-constructs-the.html
+ // Because of this bug we need to have this object default constructible, which makes no sense otherwise. Hack.
+ private:
+  TaskRescheduleInfo() : wait_time_(std::chrono::milliseconds(0)), finished_(true) {}
+  friend class std::_Associated_state<TaskRescheduleInfo>;
+#endif
+};
+
+class ComplexMonitor : public utils::AfterExecute<TaskRescheduleInfo> {
+ public:
+  ComplexMonitor() = default;
+
+  virtual bool isFinished(const TaskRescheduleInfo &result) override {
+    if (result.finished_) {
+      return true;
+    }
+    current_wait_.store(result.wait_time_);
+    return false;
+  }
+  virtual bool isCancelled(const TaskRescheduleInfo &result) override {
+    return false;
+  }
+  /**
+   * Time to wait before re-running this task if necessary
+   * @return milliseconds since epoch after which we are eligible to re-run this task.
+   */
+  virtual std::chrono::milliseconds wait_time() override {
+    return current_wait_.load();
+  }
+
+ private:
+  std::atomic<std::chrono::milliseconds> current_wait_ {std::chrono::milliseconds(0)};
+};
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_MONITORS_H
\ No newline at end of file
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 5bbd3f6..2554dc2 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -30,6 +30,7 @@
 #include <functional>
 
 #include "BackTrace.h"
+#include "Monitors.h"
 #include "core/expect.h"
 #include "controllers/ThreadManagementService.h"
 #include "concurrentqueue.h"
@@ -42,33 +43,6 @@
 namespace utils {
 
 /**
- * Worker task helper that determines
- * whether or not we will run
- */
-template<typename T>
-class AfterExecute {
- public:
-  virtual ~AfterExecute() {
-
-  }
-
-  explicit AfterExecute() {
-
-  }
-
-  explicit AfterExecute(AfterExecute &&other) {
-
-  }
-  virtual bool isFinished(const T &result) = 0;
-  virtual bool isCancelled(const T &result) = 0;
-  /**
-   * Time to wait before re-running this task if necessary
-   * @return milliseconds since epoch after which we are eligible to re-run this task.
-   */
-  virtual int64_t wait_time() = 0;
-};
-
-/**
  * Worker task
  * purpose: Provides a wrapper for the functor
  * and returns a future based on the template argument.
@@ -78,7 +52,7 @@
  public:
   explicit Worker(std::function<T()> &task, const std::string &identifier, std::unique_ptr<AfterExecute<T>> run_determinant)
       : identifier_(identifier),
-        time_slice_(0),
+        next_exec_time_(std::chrono::steady_clock::now()),
         task(task),
         run_determinant_(std::move(run_determinant)) {
     promise = std::make_shared<std::promise<T>>();
@@ -86,7 +60,7 @@
 
   explicit Worker(std::function<T()> &task, const std::string &identifier)
       : identifier_(identifier),
-        time_slice_(0),
+        next_exec_time_(std::chrono::steady_clock::now()),
         task(task),
         run_determinant_(nullptr) {
     promise = std::make_shared<std::promise<T>>();
@@ -94,7 +68,7 @@
 
   explicit Worker(const std::string identifier = "")
       : identifier_(identifier),
-        time_slice_(0) {
+        next_exec_time_(std::chrono::steady_clock::now()) {
   }
 
   virtual ~Worker() {
@@ -104,9 +78,9 @@
   /**
    * Move constructor for worker tasks
    */
-  Worker(Worker &&other)
+  Worker (Worker &&other) noexcept
       : identifier_(std::move(other.identifier_)),
-        time_slice_(std::move(other.time_slice_)),
+        next_exec_time_(std::move(other.next_exec_time_)),
         task(std::move(other.task)),
         run_determinant_(std::move(other.run_determinant_)),
         promise(other.promise) {
@@ -125,7 +99,7 @@
       promise->set_value(result);
       return false;
     }
-    time_slice_ = increment_time(run_determinant_->wait_time());
+    next_exec_time_ += run_determinant_->wait_time();
     return true;
   }
 
@@ -133,59 +107,52 @@
     identifier_ = identifier;
   }
 
-  virtual uint64_t getTimeSlice() {
-    return time_slice_;
+  virtual std::chrono::time_point<std::chrono::steady_clock> getNextExecutionTime() const {
+    return next_exec_time_;
   }
 
-  virtual uint64_t getWaitTime() {
+  virtual std::chrono::milliseconds getWaitTime() const {
     return run_determinant_->wait_time();
   }
 
   Worker<T>(const Worker<T>&) = delete;
-  Worker<T>& operator =(const Worker<T>&) = delete;
+  Worker<T>& operator= (const Worker<T>&) = delete;
 
-  Worker<T>& operator =(Worker<T> &&);
+  Worker<T>& operator= (Worker<T> &&) noexcept;
 
-  std::shared_ptr<std::promise<T>> getPromise();
+  std::shared_ptr<std::promise<T>> getPromise() const;
 
-  const std::string &getIdentifier() {
+  const std::string &getIdentifier() const {
     return identifier_;
   }
- protected:
-
-  inline uint64_t increment_time(const uint64_t &time) {
-    std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
-    auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
-    return millis + time;
-  }
-
+protected:
   std::string identifier_;
-  uint64_t time_slice_;
+  std::chrono::time_point<std::chrono::steady_clock> next_exec_time_;
   std::function<T()> task;
   std::unique_ptr<AfterExecute<T>> run_determinant_;
   std::shared_ptr<std::promise<T>> promise;
 };
 
 template<typename T>
-class WorkerComparator {
+class DelayedTaskComparator {
  public:
   bool operator()(Worker<T> &a, Worker<T> &b) {
-    return a.getTimeSlice() < b.getTimeSlice();
+    return a.getNextExecutionTime() > b.getNextExecutionTime();
   }
 };
 
 template<typename T>
-Worker<T>& Worker<T>::operator =(Worker<T> && other) {
+Worker<T>& Worker<T>::operator =(Worker<T> && other) noexcept {
   task = std::move(other.task);
   promise = other.promise;
-  time_slice_ = std::move(other.time_slice_);
+  next_exec_time_ = std::move(other.next_exec_time_);
   identifier_ = std::move(other.identifier_);
   run_determinant_ = std::move(other.run_determinant_);
   return *this;
 }
 
 template<typename T>
-std::shared_ptr<std::promise<T>> Worker<T>::getPromise() {
+std::shared_ptr<std::promise<T>> Worker<T>::getPromise() const {
   return promise;
 }
 
@@ -231,18 +198,10 @@
     thread_manager_ = nullptr;
   }
 
-  ThreadPool(const ThreadPool<T> &&other)
-      : daemon_threads_(std::move(other.daemon_threads_)),
-        thread_reduction_count_(0),
-        max_worker_threads_(std::move(other.max_worker_threads_)),
-        adjust_threads_(false),
-        running_(false),
-        controller_service_provider_(std::move(other.controller_service_provider_)),
-        thread_manager_(std::move(other.thread_manager_)),
-        name_(std::move(other.name_)) {
-    current_workers_ = 0;
-    task_count_ = 0;
-  }
+  ThreadPool(const ThreadPool<T> &other) = delete;
+  ThreadPool<T>& operator=(const ThreadPool<T> &other) = delete;
+  ThreadPool(ThreadPool<T> &&other) = delete;
+  ThreadPool<T>& operator=(ThreadPool<T> &&other) = delete;
 
   ~ThreadPool() {
     shutdown();
@@ -268,8 +227,16 @@
   /**
    * Returns true if a task is running.
    */
-  bool isRunning(const std::string &identifier) {
-    return task_status_[identifier] == true;
+  bool isTaskRunning(const std::string &identifier) const {
+    try {
+      return task_status_.at(identifier) == true;
+    } catch (const std::out_of_range &e) {
+      return false;
+    }
+  }
+
+  bool isRunning() const {
+    return running_.load();
   }
 
   std::vector<BackTrace> getTraces() {
@@ -304,44 +271,24 @@
    */
   void setMaxConcurrentTasks(uint16_t max) {
     std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
-    if (running_) {
+    bool was_running = running_;
+    if (was_running) {
       shutdown();
     }
     max_worker_threads_ = max;
-    if (!running_)
+    if (was_running)
       start();
   }
 
-  ThreadPool<T> operator=(const ThreadPool<T> &other) = delete;
-  ThreadPool(const ThreadPool<T> &other) = delete;
-
-  ThreadPool<T> &operator=(ThreadPool<T> &&other) {
+  void setControllerServiceProvider(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider) {
     std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
-    if (other.running_) {
-      other.shutdown();
-    }
-    if (running_) {
+    bool was_running = running_;
+    if (was_running) {
       shutdown();
     }
-    max_worker_threads_ = std::move(other.max_worker_threads_);
-    daemon_threads_ = std::move(other.daemon_threads_);
-    current_workers_ = 0;
-    thread_reduction_count_ = 0;
-
-    thread_queue_ = std::move(other.thread_queue_);
-    worker_queue_ = std::move(other.worker_queue_);
-
-    controller_service_provider_ = std::move(other.controller_service_provider_);
-    thread_manager_ = std::move(other.thread_manager_);
-
-    adjust_threads_ = false;
-
-    if (!running_) {
+    controller_service_provider_ = controller_service_provider;
+    if (was_running)
       start();
-    }
-
-    name_ = other.name_;
-    return *this;
   }
 
  protected:
@@ -372,6 +319,8 @@
   std::vector<std::shared_ptr<WorkerThread>> thread_queue_;
 // manager thread
   std::thread manager_thread_;
+// the thread responsible for putting delayed tasks to the worker queue when they had to be put
+  std::thread delayed_scheduler_thread_;
 // conditional that's used to adjust the threads
   std::atomic<bool> adjust_threads_;
 // atomic running boolean
@@ -384,9 +333,11 @@
   moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
 // worker queue of worker objects
   moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
-  std::priority_queue<Worker<T>, std::vector<Worker<T>>, WorkerComparator<T>> worker_priority_queue_;
+  std::priority_queue<Worker<T>, std::vector<Worker<T>>, DelayedTaskComparator<T>> delayed_worker_queue_;
 // notification for available work
   std::condition_variable tasks_available_;
+// notification for new delayed tasks that's before the current ones
+  std::condition_variable delayed_task_available_;
 // map to identify if a task should be
   std::map<std::string, bool> task_status_;
 // manager mutex
@@ -410,248 +361,10 @@
    * Runs worker tasks
    */
   void run_tasks(std::shared_ptr<WorkerThread> thread);
+
+  void manage_delayed_queue();
 };
 
-template<typename T>
-bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
-  {
-    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-    task_status_[task.getIdentifier()] = true;
-  }
-  future = std::move(task.getPromise()->get_future());
-  bool enqueued = worker_queue_.enqueue(std::move(task));
-  if (running_) {
-    tasks_available_.notify_one();
-  }
-
-  task_count_++;
-
-  return enqueued;
-}
-
-template<typename T>
-void ThreadPool<T>::manageWorkers() {
-  for (int i = 0; i < max_worker_threads_; i++) {
-    std::stringstream thread_name;
-    thread_name << name_ << " #" << i;
-    auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
-    worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
-    thread_queue_.push_back(worker_thread);
-    current_workers_++;
-  }
-
-  if (daemon_threads_) {
-    for (auto &thread : thread_queue_) {
-      thread->thread_.detach();
-    }
-  }
-
-// likely don't have a thread manager
-  if (LIKELY(nullptr != thread_manager_)) {
-    while (running_) {
-      auto waitperiod = std::chrono::milliseconds(1) * 500;
-      {
-        if (thread_manager_->isAboveMax(current_workers_)) {
-          auto max = thread_manager_->getMaxConcurrentTasks();
-          auto differential = current_workers_ - max;
-          thread_reduction_count_ += differential;
-        } else if (thread_manager_->shouldReduce()) {
-          if (current_workers_ > 1)
-            thread_reduction_count_++;
-          thread_manager_->reduce();
-        } else if (thread_manager_->canIncrease() && max_worker_threads_ - current_workers_ > 0) {  // increase slowly
-          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-          auto worker_thread = std::make_shared<WorkerThread>();
-          worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
-          if (daemon_threads_) {
-            worker_thread->thread_.detach();
-          }
-          thread_queue_.push_back(worker_thread);
-          current_workers_++;
-        }
-      }
-      {
-        std::shared_ptr<WorkerThread> thread_ref;
-        while (deceased_thread_queue_.try_dequeue(thread_ref)) {
-          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-          if (thread_ref->thread_.joinable())
-            thread_ref->thread_.join();
-          thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end());
-        }
-      }
-      std::this_thread::sleep_for(waitperiod);
-    }
-  } else {
-    for (auto &thread : thread_queue_) {
-      if (thread->thread_.joinable())
-        thread->thread_.join();
-    }
-  }
-}
-template<typename T>
-void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
-  auto waitperiod = std::chrono::milliseconds(1) * 100;
-  thread->is_running_ = true;
-  uint64_t wait_decay_ = 0;
-  uint64_t yield_backoff = 10;  // start at 10 ms
-  while (running_.load()) {
-    if (UNLIKELY(thread_reduction_count_ > 0)) {
-      if (--thread_reduction_count_ >= 0) {
-        deceased_thread_queue_.enqueue(thread);
-        thread->is_running_ = false;
-        break;
-      } else {
-        thread_reduction_count_++;
-      }
-    }
-    // if we exceed 500ms of wait due to not being able to run any tasks and there are tasks available, meaning
-    // they are eligible to run per the fact that the thread pool isn't shut down and the tasks are in a runnable state
-    // BUT they've been continually timesliced, we will lower the wait decay to 100ms and continue incrementing from
-    // there. This ensures we don't have arbitrarily long sleep cycles.
-    if (wait_decay_ > 500000000L) {
-      wait_decay_ = 100000000L;
-    }
-    // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
-    // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
-    // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
-    // be more likely to run. This is intentional.
-
-    if (wait_decay_ > 2000) {
-      std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
-    }
-
-    if (current_workers_ < max_worker_threads_) {
-      // we are in a reduced state. due to thread management
-      // let's institute a backoff up to 500ms
-      if (yield_backoff < 500) {
-        yield_backoff += 10;
-      }
-      std::this_thread::sleep_for(std::chrono::milliseconds(yield_backoff));
-    } else {
-      yield_backoff = 10;
-    }
-    Worker<T> task;
-
-    bool prioritized_task = false;
-
-    if (!prioritized_task) {
-      if (!worker_queue_.try_dequeue(task)) {
-        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-        if (worker_priority_queue_.size() > 0) {
-          // this is safe as we are going to immediately pop the queue
-          while (!worker_priority_queue_.empty()) {
-            task = std::move(const_cast<Worker<T>&>(worker_priority_queue_.top()));
-            worker_priority_queue_.pop();
-            worker_queue_.enqueue(std::move(task));
-            continue;
-          }
-
-        }
-        tasks_available_.wait_for(lock, waitperiod);
-        continue;
-      } else {
-        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-        if (!task_status_[task.getIdentifier()]) {
-          continue;
-        }
-      }
-
-      bool wait_to_run = false;
-      if (task.getTimeSlice() > 1) {
-        double wt = (double) task.getWaitTime();
-        auto now = std::chrono::system_clock::now().time_since_epoch();
-        auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
-
-        // if our differential is < 10% of the wait time we will not put the task into a wait state
-        // since requeuing will break the time slice contract.
-        if ((double) task.getTimeSlice() > ms && ((double) (task.getTimeSlice() - ms)) > (wt * .10)) {
-          wait_to_run = true;
-        }
-      }
-      // if we have to wait we re-queue the worker.
-      if (wait_to_run) {
-        {
-          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-          if (!task_status_[task.getIdentifier()]) {
-            continue;
-          }
-          // put it on the priority queue
-          worker_priority_queue_.push(std::move(task));
-        }
-
-        wait_decay_ += 25;
-        continue;
-      }
-    }
-    const bool task_renew = task.run();
-    wait_decay_ = 0;
-    if (task_renew) {
-
-      if (UNLIKELY(task_count_ > current_workers_)) {
-        // even if we have more work to do we will not
-        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-        if (!task_status_[task.getIdentifier()]) {
-          continue;
-        }
-
-        worker_priority_queue_.push(std::move(task));
-      } else {
-        worker_queue_.enqueue(std::move(task));
-      }
-    }
-  }
-  current_workers_--;
-}
-template<typename T>
-void ThreadPool<T>::start() {
-  if (nullptr != controller_service_provider_) {
-    auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
-    thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
-  } else {
-    thread_manager_ = nullptr;
-  }
-  std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
-  if (!running_) {
-    running_ = true;
-    manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this));
-    if (worker_queue_.size_approx() > 0) {
-      tasks_available_.notify_all();
-    }
-  }
-}
-
-template<typename T>
-void ThreadPool<T>::stopTasks(const std::string &identifier) {
-  std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-  task_status_[identifier] = false;
-}
-
-template<typename T>
-void ThreadPool<T>::shutdown() {
-  if (running_.load()) {
-    std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
-    running_.store(false);
-
-    drain();
-    task_status_.clear();
-    if (manager_thread_.joinable())
-      manager_thread_.join();
-    {
-      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
-      for(const auto &thread : thread_queue_){
-        if (thread->thread_.joinable())
-        thread->thread_.join();
-      }
-      thread_queue_.clear();
-      current_workers_ = 0;
-      while (worker_queue_.size_approx() > 0) {
-        Worker<T> task;
-        worker_queue_.try_dequeue(task);
-      }
-    }
-  }
-}
-
 } /* namespace utils */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/src/CPPLINT.cfg b/libminifi/src/CPPLINT.cfg
index 9205687..bba5060 100644
--- a/libminifi/src/CPPLINT.cfg
+++ b/libminifi/src/CPPLINT.cfg
@@ -1,3 +1,2 @@
-set noparent
 filter=-build/include_order,-build/include_alpha
 exclude_files=ResourceClaim.cpp
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index a874a93..38b9c19 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -29,6 +29,7 @@
 const char *Configure::nifi_flow_configuration_file_backup_update = "nifi.flow.configuration.backup.on.update";
 const char *Configure::nifi_flow_engine_threads = "nifi.flow.engine.threads";
 const char *Configure::nifi_flow_engine_alert_period = "nifi.flow.engine.alert.period";
+const char *Configure::nifi_flow_engine_event_driven_time_slice = "nifi.flow.engine.event.driven.time.slice";
 const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration";
 const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration";
 const char *Configure::nifi_graceful_shutdown_seconds = "nifi.flowcontroller.graceful.shutdown.period";
diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp
index 41ffa96..53c2522 100644
--- a/libminifi/src/CronDrivenSchedulingAgent.cpp
+++ b/libminifi/src/CronDrivenSchedulingAgent.cpp
@@ -32,7 +32,7 @@
 namespace nifi {
 namespace minifi {
 
-uint64_t CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                         const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (this->running_ && processor->isRunning()) {
     std::chrono::system_clock::time_point leap_nanos;
@@ -52,7 +52,7 @@
           // we may be woken up a little early so that we can honor our time.
           // in this case we can return the next time to run with the expectation
           // that the wakeup mechanism gets more granular.
-          return std::chrono::duration_cast<std::chrono::milliseconds>(result - from).count();
+          return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(result - from));
         }
       } else {
         Bosma::Cron schedule(processor->getCronPeriod());
@@ -67,16 +67,15 @@
 
       if (processor->isYield()) {
         // Honor the yield
-        return processor->getYieldTime();
+        return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime()));
       } else if (shouldYield && this->bored_yield_duration_ > 0) {
         // No work to do or need to apply back pressure
-        return this->bored_yield_duration_;
+        return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(this->bored_yield_duration_));
       }
     }
-    auto sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(result - from).count();
-    return sleep_time;
+    return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(result - from));
   }
-  return 0;
+  return utils::TaskRescheduleInfo::Done();
 }
 
 } /* namespace minifi */
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp
index c56ac58..ef771b9 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -19,9 +19,6 @@
  */
 #include "EventDrivenSchedulingAgent.h"
 #include <chrono>
-#include <memory>
-#include <thread>
-#include <iostream>
 #include "core/Processor.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSessionFactory.h"
@@ -32,28 +29,25 @@
 namespace nifi {
 namespace minifi {
 
-uint64_t EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                          const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  while (this->running_) {
-    bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
-
-    if (processor->isYield()) {
-      // Honor the yield
-      return processor->getYieldTime();
-    } else if (shouldYield && this->bored_yield_duration_ > 0) {
-      // No work to do or need to apply back pressure
-      return this->bored_yield_duration_;
+  if (this->running_) {
+    auto start_time = std::chrono::steady_clock::now();
+    // trigger processor until it has work to do, but no more than half a sec
+    while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) {
+      bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
+      if (processor->isYield()) {
+        // Honor the yield
+        return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime()));
+      } else if (shouldYield) {
+        // No work to do or need to apply back pressure
+        return utils::TaskRescheduleInfo::RetryIn(
+            std::chrono::milliseconds((this->bored_yield_duration_ > 0) ? this->bored_yield_duration_ : 10));  // No work left to do, stand by
+      }
     }
-
-    // Block until work is available
-
-    processor->waitForWork(1000);
-
-    if (!processor->isWorkAvailable()) {
-      return 1000;
-    }
+    return utils::TaskRescheduleInfo::RetryImmediately();  // Let's continue work as soon as a thread is available
   }
-  return 0;
+  return utils::TaskRescheduleInfo::Done();
 }
 
 } /* namespace minifi */
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 9de7822..31160c8 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -74,8 +74,6 @@
                                std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo, const std::string name, bool headless_mode)
     : core::controller::ControllerServiceProvider(core::getClassName<FlowController>()),
       root_(nullptr),
-      max_timer_driven_threads_(0),
-      max_event_driven_threads_(0),
       running_(false),
       updating_(false),
       c2_enabled_(true),
@@ -84,6 +82,7 @@
       flow_file_repo_(flow_file_repo),
       protocol_(0),
       controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()),
+      thread_pool_(2, false, nullptr, "Flowcontroller threadpool"),
       timer_scheduler_(nullptr),
       event_scheduler_(nullptr),
       cron_scheduler_(nullptr),
@@ -101,14 +100,11 @@
   }
   id_generator_->generate(uuid_);
   setUUID(uuid_);
-
   flow_update_ = false;
   // Setup the default values
   if (flow_configuration_ != nullptr) {
     configuration_filename_ = flow_configuration_->getConfigurationPath();
   }
-  max_event_driven_threads_ = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
-  max_timer_driven_threads_ = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
   running_ = false;
   initialized_ = false;
   c2_initialized_ = false;
@@ -247,6 +243,7 @@
     this->timer_scheduler_->stop();
     this->event_scheduler_->stop();
     this->cron_scheduler_->stop();
+    this->thread_pool_.shutdown();
     running_ = false;
   }
   return 0;
@@ -313,21 +310,25 @@
 
     controller_service_provider_ = flow_configuration_->getControllerServiceProvider();
 
-    if (nullptr == timer_scheduler_ || reload) {
-      timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(
-          std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_,
-          configuration_);
+    auto base_shared_ptr = std::dynamic_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this());
+
+    if (!thread_pool_.isRunning() || reload) {
+      thread_pool_.shutdown();
+      thread_pool_.setMaxConcurrentTasks(configuration_->getInt(Configure::nifi_flow_engine_threads, 2));
+      thread_pool_.setControllerServiceProvider(base_shared_ptr);
+      thread_pool_.start();
     }
+
+    if (nullptr == timer_scheduler_ || reload) {
+      timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
+    }
+
     if (nullptr == event_scheduler_ || reload) {
-      event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>(
-          std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_,
-          configuration_);
+      event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
     }
 
     if (nullptr == cron_scheduler_ || reload) {
-      cron_scheduler_ = std::make_shared<CronDrivenSchedulingAgent>(
-          std::static_pointer_cast<core::controller::ControllerServiceProvider>(std::dynamic_pointer_cast<FlowController>(shared_from_this())), provenance_repo_, flow_file_repo_, content_repo_,
-          configuration_);
+      cron_scheduler_ = std::make_shared<CronDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
     }
 
     std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_);
@@ -769,7 +770,7 @@
  * Enables the controller service services
  * @param serviceNode service node which will be disabled, along with linked services.
  */
-std::future<uint64_t> FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<utils::TaskRescheduleInfo> FlowController::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
   return controller_service_provider_->enableControllerService(serviceNode);
 }
 
@@ -784,7 +785,7 @@
  * Disables controller services
  * @param serviceNode service node which will be disabled, along with linked services.
  */
-std::future<uint64_t> FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<utils::TaskRescheduleInfo> FlowController::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
   return controller_service_provider_->disableControllerService(serviceNode);
 }
 
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index a8684c3..65a79ae 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -26,6 +26,7 @@
 #include "Exception.h"
 #include "core/Processor.h"
 #include "utils/ScopeGuard.h"
+#include "utils/GeneralUtils.h"
 
 namespace org {
 namespace apache {
@@ -40,41 +41,41 @@
     return false;
 }
 
-std::future<uint64_t> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<utils::TaskRescheduleInfo> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
   logger_->log_info("Enabling CSN in SchedulingAgent %s", serviceNode->getName());
   // reference the enable function from serviceNode
-  std::function<uint64_t()> f_ex = [serviceNode] {
+  std::function<utils::TaskRescheduleInfo()> f_ex = [serviceNode] {
       serviceNode->enable();
-      return 0;
+      return utils::TaskRescheduleInfo::Done();
     };
 
   // only need to run this once.
-  std::unique_ptr<SingleRunMonitor> monitor = std::unique_ptr<SingleRunMonitor>(new SingleRunMonitor(&running_));
-  utils::Worker<uint64_t> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
+  auto monitor = utils::make_unique<utils::ComplexMonitor>();
+  utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
   // move the functor into the thread pool. While a future is returned
   // we aren't terribly concerned with the result.
-  std::future<uint64_t> future;
+  std::future<utils::TaskRescheduleInfo> future;
   thread_pool_.execute(std::move(functor), future);
   if (future.valid())
     future.wait();
   return future;
 }
 
-std::future<uint64_t> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
+std::future<utils::TaskRescheduleInfo> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
   logger_->log_info("Disabling CSN in SchedulingAgent %s", serviceNode->getName());
   // reference the disable function from serviceNode
-  std::function<uint64_t()> f_ex = [serviceNode] {
+  std::function<utils::TaskRescheduleInfo()> f_ex = [serviceNode] {
     serviceNode->disable();
-    return 0;
+    return utils::TaskRescheduleInfo::Done();
   };
 
   // only need to run this once.
-  std::unique_ptr<SingleRunMonitor> monitor = std::unique_ptr<SingleRunMonitor>(new SingleRunMonitor(&running_));
-  utils::Worker<uint64_t> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
+  auto monitor = utils::make_unique<utils::ComplexMonitor>();
+  utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor));
 
   // move the functor into the thread pool. While a future is returned
   // we aren't terribly concerned with the result.
-  std::future<uint64_t> future;
+  std::future<utils::TaskRescheduleInfo> future;
   thread_pool_.execute(std::move(functor), future);
   if (future.valid())
     future.wait();
@@ -121,10 +122,6 @@
   try {
     processor->onTrigger(processContext, sessionFactory);
     processor->decrementActiveTask();
-  } catch (Exception &exception) {
-    // Normal exception
-    logger_->log_debug("Caught Exception %s", exception.what());
-    processor->decrementActiveTask();
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
     processor->yield(admin_yield_duration_);
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
index 0b072d8..c096fbb 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -35,6 +35,7 @@
 #include "core/ProcessContextBuilder.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
+#include "utils/GeneralUtils.h"
 
 namespace org {
 namespace apache {
@@ -44,7 +45,7 @@
 void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processor) {
   std::lock_guard<std::mutex> lock(mutex_);
 
-  admin_yield_duration_ = 0;
+  admin_yield_duration_ = 100;  // We should prevent burning CPU in case of rollbacks
   std::string yieldValue;
 
   if (configure_->get(Configure::nifi_administrative_yield_duration, yieldValue)) {
@@ -67,7 +68,7 @@
     return;
   }
 
-  if (thread_pool_.isRunning(processor->getUUIDStr())) {
+  if (thread_pool_.isTaskRunning(processor->getUUIDStr())) {
     logger_->log_warn("Can not schedule threads for processor %s because there are existing threads running", processor->getName());
     return;
   }
@@ -92,25 +93,30 @@
     // reference the disable function from serviceNode
     processor->incrementActiveTasks();
 
-    std::function<uint64_t()> f_ex = [agent, processor, processContext, sessionFactory] () {
+    std::function<utils::TaskRescheduleInfo()> f_ex = [agent, processor, processContext, sessionFactory] () {
       return agent->run(processor, processContext, sessionFactory);
     };
 
     // create a functor that will be submitted to the thread pool.
-    std::unique_ptr<TimerAwareMonitor> monitor = std::unique_ptr<TimerAwareMonitor>(new TimerAwareMonitor(&running_));
-    utils::Worker<uint64_t> functor(f_ex, processor->getUUIDStr(), std::move(monitor));
+    auto monitor = utils::make_unique<utils::ComplexMonitor>();
+    utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, processor->getUUIDStr(), std::move(monitor));
     // move the functor into the thread pool. While a future is returned
     // we aren't terribly concerned with the result.
-    std::future<uint64_t> future;
+    std::future<utils::TaskRescheduleInfo> future;
     thread_pool_.execute(std::move(functor), future);
   }
   logger_->log_debug("Scheduled thread %d concurrent workers for for process %s", processor->getMaxConcurrentTasks(), processor->getName());
+  processors_running_.insert(processor->getUUIDStr());
   return;
 }
 
 void ThreadedSchedulingAgent::stop() {
   SchedulingAgent::stop();
-  thread_pool_.shutdown();
+  std::lock_guard<std::mutex> lock(mutex_);
+  for (const auto& p : processors_running_) {
+    logger_->log_error("SchedulingAgent is stopped before processor was unscheduled: %s", p);
+    thread_pool_.stopTasks(p);
+  }
 }
 
 void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) {
@@ -127,6 +133,8 @@
   processor->clearActiveTask();
 
   processor->setScheduledState(core::STOPPED);
+
+  processors_running_.erase(processor->getUUIDStr());
 }
 
 } /* namespace minifi */
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 13a3439..1b6b7f6 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -29,20 +29,21 @@
 namespace nifi {
 namespace minifi {
 
-uint64_t TimerDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
+utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                          const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  while (this->running_ && processor->isRunning()) {
+  if (this->running_ && processor->isRunning()) {
     bool shouldYield = this->onTrigger(processor, processContext, sessionFactory);
     if (processor->isYield()) {
       // Honor the yield
-      return processor->getYieldTime();
+      return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime()));
     } else if (shouldYield && this->bored_yield_duration_ > 0) {
       // No work to do or need to apply back pressure
-      return this->bored_yield_duration_;
+      return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(this->bored_yield_duration_));
     }
-    return processor->getSchedulingPeriodNano() / 1000000;
+    return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(
+        std::chrono::nanoseconds(processor->getSchedulingPeriodNano())));
   }
-  return processor->getSchedulingPeriodNano() / 1000000;
+  return utils::TaskRescheduleInfo::Done();
 }
 
 } /* namespace minifi */
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index d3e579f..46d0794 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -268,6 +268,7 @@
 
 bool Processor::isWorkAvailable() {
   // We have work if any incoming connection has work
+  std::lock_guard<std::mutex> lock(mutex_);
   bool hasWork = false;
 
   try {
diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp
new file mode 100644
index 0000000..039e136
--- /dev/null
+++ b/libminifi/src/utils/ThreadPool.cpp
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ThreadPool.h"
+#include "core/state/StateManager.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template<typename T>
+void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
+  thread->is_running_ = true;
+  while (running_.load()) {
+    if (UNLIKELY(thread_reduction_count_ > 0)) {
+      if (--thread_reduction_count_ >= 0) {
+        deceased_thread_queue_.enqueue(thread);
+        thread->is_running_ = false;
+        break;
+      } else {
+        thread_reduction_count_++;
+      }
+    }
+
+    Worker<T> task;
+    if (worker_queue_.try_dequeue(task)) {
+      {
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        if (!task_status_[task.getIdentifier()]) {
+          continue;
+        }
+      }
+      if (task.run()) {
+        if (task.getNextExecutionTime() <= std::chrono::steady_clock::now()) {
+          // it can be rescheduled again as soon as there is a worker available
+          worker_queue_.enqueue(std::move(task));
+          continue;
+        }
+        // Task will be put to the delayed queue as next exec time is in the future
+        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+        bool need_to_notify =
+            delayed_worker_queue_.empty() ||
+                task.getNextExecutionTime() < delayed_worker_queue_.top().getNextExecutionTime();
+
+        delayed_worker_queue_.push(std::move(task));
+        if (need_to_notify) {
+          delayed_task_available_.notify_all();
+        }
+      }
+    } else {
+      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+      tasks_available_.wait(lock);
+    }
+  }
+  current_workers_--;
+}
+
+template<typename T>
+void ThreadPool<T>::manage_delayed_queue() {
+  while (running_) {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+
+    // Put the tasks ready to run in the worker queue
+    while (!delayed_worker_queue_.empty() &&
+        delayed_worker_queue_.top().getNextExecutionTime() <= std::chrono::steady_clock::now()) {
+      // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted.
+      Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
+      delayed_worker_queue_.pop();
+      worker_queue_.enqueue(std::move(task));
+      tasks_available_.notify_one();
+    }
+    if (delayed_worker_queue_.empty()) {
+      delayed_task_available_.wait(lock);
+    } else {
+      auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(
+          delayed_worker_queue_.top().getNextExecutionTime() - std::chrono::steady_clock::now());
+      delayed_task_available_.wait_for(lock, (std::max)(wait_time, std::chrono::milliseconds(1)));
+    }
+  }
+}
+
+template<typename T>
+bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
+  {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    task_status_[task.getIdentifier()] = true;
+  }
+  future = std::move(task.getPromise()->get_future());
+  bool enqueued = worker_queue_.enqueue(std::move(task));
+  if (running_) {
+    tasks_available_.notify_one();
+  }
+
+  task_count_++;
+
+  return enqueued;
+}
+
+template<typename T>
+void ThreadPool<T>::manageWorkers() {
+  for (int i = 0; i < max_worker_threads_; i++) {
+    std::stringstream thread_name;
+    thread_name << name_ << " #" << i;
+    auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
+    worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+    thread_queue_.push_back(worker_thread);
+    current_workers_++;
+  }
+
+  if (daemon_threads_) {
+    for (auto &thread : thread_queue_) {
+      thread->thread_.detach();
+    }
+  }
+
+  if (nullptr != thread_manager_) {
+    while (running_) {
+      auto waitperiod = std::chrono::milliseconds(500);
+      {
+        std::unique_lock<std::recursive_mutex> lock(manager_mutex_, std::try_to_lock);
+        if (!lock.owns_lock()) {
+          // Threadpool is being stopped/started or config is being changed, better wait a bit
+          std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        }
+        if (thread_manager_->isAboveMax(current_workers_)) {
+          auto max = thread_manager_->getMaxConcurrentTasks();
+          auto differential = current_workers_ - max;
+          thread_reduction_count_ += differential;
+        } else if (thread_manager_->shouldReduce()) {
+          if (current_workers_ > 1)
+            thread_reduction_count_++;
+          thread_manager_->reduce();
+        } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) {  // increase slowly
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          auto worker_thread = std::make_shared<WorkerThread>();
+          worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+          if (daemon_threads_) {
+            worker_thread->thread_.detach();
+          }
+          thread_queue_.push_back(worker_thread);
+          current_workers_++;
+        }
+        std::shared_ptr<WorkerThread> thread_ref;
+        while (deceased_thread_queue_.try_dequeue(thread_ref)) {
+          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          if (thread_ref->thread_.joinable())
+            thread_ref->thread_.join();
+          thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end());
+        }
+      }
+      std::this_thread::sleep_for(waitperiod);
+    }
+  } else {
+    for (auto &thread : thread_queue_) {
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
+  }
+}
+
+template<typename T>
+void ThreadPool<T>::start() {
+  if (nullptr != controller_service_provider_) {
+    auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
+    thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
+  } else {
+    thread_manager_ = nullptr;
+  }
+
+  std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+  if (!running_) {
+    running_ = true;
+    manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this));
+    if (worker_queue_.size_approx() > 0) {
+      tasks_available_.notify_all();
+    }
+
+    std::lock_guard<std::mutex> quee_lock(worker_queue_mutex_);
+    delayed_scheduler_thread_ = std::thread(&ThreadPool<T>::manage_delayed_queue, this);
+  }
+}
+
+template<typename T>
+void ThreadPool<T>::stopTasks(const std::string &identifier) {
+  std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+  task_status_[identifier] = false;
+}
+
+template<typename T>
+void ThreadPool<T>::shutdown() {
+  if (running_.load()) {
+    std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+    running_.store(false);
+
+    drain();
+
+    task_status_.clear();
+    if (manager_thread_.joinable()) {
+      manager_thread_.join();
+    }
+
+    delayed_task_available_.notify_all();
+    if (delayed_scheduler_thread_.joinable()) {
+      delayed_scheduler_thread_.join();
+    }
+
+    for (const auto &thread : thread_queue_) {
+      if (thread->thread_.joinable())
+        thread->thread_.join();
+    }
+
+    thread_queue_.clear();
+    current_workers_ = 0;
+    while (!delayed_worker_queue_.empty()) {
+      delayed_worker_queue_.pop();
+    }
+
+    while (worker_queue_.size_approx() > 0) {
+      Worker<T> task;
+      worker_queue_.try_dequeue(task);
+    }
+  }
+}
+
+template class utils::ThreadPool<utils::TaskRescheduleInfo>;
+template class utils::ThreadPool<int>;
+template class utils::ThreadPool<bool>;
+template class utils::ThreadPool<state::Update>;
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/test/resources/TestUpdateAttribute.yml b/libminifi/test/resources/TestUpdateAttribute.yml
index f741bfb..522484e 100644
--- a/libminifi/test/resources/TestUpdateAttribute.yml
+++ b/libminifi/test/resources/TestUpdateAttribute.yml
@@ -35,36 +35,30 @@
       id: 2438e3c8-015a-1000-79ca-83af40ec1992
       class: org.apache.nifi.processors.standard.UpdateAttribute
       max concurrent tasks: 1
-      scheduling strategy: TIMER_DRIVEN
-      scheduling period: 1 sec
+      scheduling strategy: EVENT_DRIVEN
       penalization period: 30 sec
       yield period: 1 sec
       run duration nanos: 0
-      auto-terminated relationships list: failure
       Properties:
         route_condition_attr: true
     - name: roa
       id: 2438e3c8-015a-1000-79ca-83af40ec1993
       class: org.apache.nifi.processors.standard.RouteOnAttribute
       max concurrent tasks: 1
-      scheduling strategy: TIMER_DRIVEN
-      scheduling period: 1 sec
+      scheduling strategy: EVENT_DRIVEN
       penalization period: 30 sec
       yield period: 1 sec
       run duration nanos: 0
-      auto-terminated relationships list: failure
       Properties:
         route_matched: ${route_condition_attr}    
     - name: up2
       id: 2438e3c8-015a-1000-79ca-83af40ec1994
       class: org.apache.nifi.processors.standard.UpdateAttribute
       max concurrent tasks: 1
-      scheduling strategy: TIMER_DRIVEN
-      scheduling period: 1 sec
+      scheduling strategy: EVENT_DRIVEN
       penalization period: 30 sec
       yield period: 1 sec
       run duration nanos: 0
-      auto-terminated relationships list: failure
       Properties:
         route_check_attr: good
         variable_attribute: ${nifi.variable.test}
@@ -72,12 +66,12 @@
       id: 2438e3c8-015a-1000-79ca-83af40ec1995
       class: org.apache.nifi.processors.standard.LogAttribute
       max concurrent tasks: 1
-      scheduling strategy: TIMER_DRIVEN
-      scheduling period: 1 sec
+      scheduling strategy: EVENT_DRIVEN
       penalization period: 30 sec
       yield period: 1 sec
       run duration nanos: 0
-      auto-terminated relationships list: success
+      auto-terminated relationships list:
+        - success
       Properties:
 
 Connections:
diff --git a/libminifi/test/unit/BackTraceTests.cpp b/libminifi/test/unit/BackTraceTests.cpp
index 816ff63..806f95c 100644
--- a/libminifi/test/unit/BackTraceTests.cpp
+++ b/libminifi/test/unit/BackTraceTests.cpp
@@ -41,14 +41,14 @@
   ~WorkerNumberExecutions() {
   }
 
-  virtual bool isFinished(const int &result) {
+  bool isFinished(const int &result) override {
     if (result > 0 && ++runs < tasks) {
       return false;
     } else {
       return true;
     }
   }
-  virtual bool isCancelled(const int &result) {
+  bool isCancelled(const int &result) override {
     return false;
   }
 
@@ -56,9 +56,9 @@
     return runs;
   }
 
-  virtual int64_t wait_time() {
+  std::chrono::milliseconds wait_time() override {
     // wait 50ms
-    return 50;
+    return std::chrono::milliseconds(50);
   }
 
  protected:
diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp
index 6849aa6..48301b1 100644
--- a/libminifi/test/unit/ThreadPoolTests.cpp
+++ b/libminifi/test/unit/ThreadPoolTests.cpp
@@ -41,14 +41,14 @@
   ~WorkerNumberExecutions() {
   }
 
-  virtual bool isFinished(const int &result) {
+  bool isFinished(const int &result) override {
     if (result > 0 && ++runs < tasks) {
       return false;
     } else {
       return true;
     }
   }
-  virtual bool isCancelled(const int &result) {
+  bool isCancelled(const int &result) override {
     return false;
   }
 
@@ -56,9 +56,9 @@
     return runs;
   }
 
-  virtual int64_t wait_time() {
+  std::chrono::milliseconds wait_time() override {
     // wait 50ms
-    return 50;
+    return std::chrono::milliseconds(50);
   }
 
  protected: