MINIFICPP-1288 - Extract FlowController::loadShutdownTimeoutFromConfiguration()
Simplify timeout logic in FlowController::stop()
Remove unused arguments from stop methods and set default waiting time for FlowController::stop to zero
Clean FlowController from redundant whitespaces, comments and unused member fields

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #856
diff --git a/controller/Controller.h b/controller/Controller.h
index 92d863c..726d3b6 100644
--- a/controller/Controller.h
+++ b/controller/Controller.h
@@ -331,7 +331,7 @@
   controller->load();
   controller->start();
   std::this_thread::sleep_for(std::chrono::milliseconds(10000));
-  controller->stop(true);
+  controller->stop();
 }
 
 #endif /* CONTROLLER_CONTROLLER_H_ */
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index af06f57..c40c573 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -117,7 +117,7 @@
     return -1;
   }
   // Unload the current flow YAML, clean the root process group and all its children
-  int16_t stop(bool force, uint64_t timeToWait = 0) override;
+  int16_t stop() override;
   int16_t applyUpdate(const std::string &source, const std::string &configuration) override;
   int16_t drainRepositories() override {
     return -1;
@@ -319,19 +319,15 @@
   std::vector<BackTrace> getTraces() override;
 
   void initializeC2();
-
   void stopC2();
 
  protected:
   void loadC2ResponseConfiguration();
-
   void loadC2ResponseConfiguration(const std::string &prefix);
-
   std::shared_ptr<state::response::ResponseNode> loadC2ResponseConfiguration(const std::string &prefix, std::shared_ptr<state::response::ResponseNode>);
 
   // function to load the flow file repo.
   void loadFlowRepo();
-
   void initializeExternalComponents();
 
   /**
@@ -339,13 +335,11 @@
    */
   virtual void initializePaths(const std::string &adjustedFilename);
 
+  utils::optional<std::chrono::milliseconds> loadShutdownTimeoutFromConfiguration();
+
   // flow controller mutex
   std::recursive_mutex mutex_;
 
-  // Configuration File Name
-  std::string configuration_file_name_;
-  // NiFi property File Name
-  std::string properties_file_name_;
   // Root Process Group
   std::shared_ptr<core::ProcessGroup> root_;
   // Whether it is running
@@ -354,7 +348,6 @@
 
   // conifiguration filename
   std::string configuration_filename_;
-
   std::atomic<bool> c2_initialized_;
   std::atomic<bool> flow_update_;
   std::atomic<bool> c2_enabled_;
@@ -362,61 +355,41 @@
   std::atomic<bool> initialized_;
   // Provenance Repo
   std::shared_ptr<core::Repository> provenance_repo_;
-
   // FlowFile Repo
   std::shared_ptr<core::Repository> flow_file_repo_;
-
   std::shared_ptr<core::ContentRepository> content_repo_;
-
   // Thread pool for schedulers
   utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_;
-  // Flow Engines
   // Flow Timer Scheduler
   std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
   // Flow Event Scheduler
   std::shared_ptr<EventDrivenSchedulingAgent> event_scheduler_;
   // Cron Schedule
   std::shared_ptr<CronDrivenSchedulingAgent> cron_scheduler_;
-  // Controller Service
-  // Config
-  // Site to Site Server Listener
-  // Heart Beat
   // FlowControl Protocol
   std::unique_ptr<FlowControlProtocol> protocol_;
-
   std::shared_ptr<Configure> configuration_;
-
   std::shared_ptr<core::controller::ControllerServiceMap> controller_service_map_;
-
   std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_;
   // flow configuration object.
   std::unique_ptr<core::FlowConfiguration> flow_configuration_;
-
   // metrics information
-
   std::chrono::steady_clock::time_point start_time_;
-
   mutable std::mutex metrics_mutex_;
   // root_nodes cache
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> root_response_nodes_;
-
   // metrics cache
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> device_information_;
-
   // metrics cache
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> component_metrics_;
-
   std::map<uint8_t, std::vector<std::shared_ptr<state::response::ResponseNode>>> component_metrics_by_id_;
-
   // metrics last run
   std::chrono::steady_clock::time_point last_metrics_capture_;
 
  private:
   std::chrono::milliseconds shutdown_check_interval_{1000};
-
   std::shared_ptr<logging::Logger> logger_;
   std::string serial_number_;
-
   std::unique_ptr<state::UpdateController> c2_agent_;
 };
 
diff --git a/libminifi/include/core/state/ProcessorController.h b/libminifi/include/core/state/ProcessorController.h
index 3085839..33a7e30 100644
--- a/libminifi/include/core/state/ProcessorController.h
+++ b/libminifi/include/core/state/ProcessorController.h
@@ -60,7 +60,7 @@
   /**
    * Stop the client
    */
-  virtual int16_t stop(bool force, uint64_t timeToWait = 0);
+  virtual int16_t stop();
 
   virtual bool isRunning();
 
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index d2729a7..ad3f5d3 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -162,7 +162,7 @@
   /**
    * Stop the client
    */
-  virtual int16_t stop(bool force, uint64_t timeToWait = 0) = 0;
+  virtual int16_t stop() = 0;
 
   virtual bool isRunning() = 0;
 
diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp
index 4dafd24..966b0dd 100644
--- a/libminifi/src/FlowControlProtocol.cpp
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -363,7 +363,7 @@
     return 0;
   } else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) {
     logger_->log_trace("Flow Control Protocol stop flow controller");
-    this->_controller->stop(true);
+    this->_controller->stop();
     this->_seqNumber++;
     utils::file::FileUtils::close(_socket);
     _socket = 0;
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index cc67fdd..e088628 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -169,8 +169,19 @@
   }
 }
 
+utils::optional<std::chrono::milliseconds> FlowController::loadShutdownTimeoutFromConfiguration() {
+  std::string shutdown_timeout_str;
+  if (configuration_->get(minifi::Configure::nifi_flowcontroller_drain_timeout, shutdown_timeout_str)) {
+    const utils::optional<core::TimePeriodValue> time_from_config = core::TimePeriodValue::fromString(shutdown_timeout_str);
+    if (time_from_config) {
+      return { std::chrono::milliseconds{ time_from_config.value().getMilliseconds() }};
+    }
+  }
+  return utils::nullopt;
+}
+
 FlowController::~FlowController() {
-  stop(true);
+  stop();
   stopC2();
   unload();
   protocol_ = nullptr;
@@ -200,7 +211,7 @@
   updating_ = true;
 
   std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
-  stop(true);
+  stop();
   unload();
   controller_map_->clear();
   auto prevRoot = std::move(this->root_);
@@ -234,7 +245,7 @@
   return started;
 }
 
-int16_t FlowController::stop(bool force, uint64_t timeToWait) {
+int16_t FlowController::stop() {
   std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
   if (running_) {
     // immediately indicate that we are not running
@@ -244,26 +255,12 @@
       this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_, [] (const std::shared_ptr<core::Processor>& proc) -> bool {
         return !proc->hasIncomingConnections();
       });
-      auto shutdown_start = std::chrono::steady_clock::now();
       // we enable C2 to progressively increase the timeout
       // in case it sees that waiting for a little longer could
       // allow the FlowFiles to be processed
-      auto shutdown_timeout = [&]() -> std::chrono::milliseconds {
-        if (timeToWait != 0) {
-          return std::chrono::milliseconds{timeToWait};
-        }
-        utils::optional<core::TimePeriodValue> shutdown_timeout;
-        std::string shutdown_timeout_str;
-        if (configuration_->get(minifi::Configure::nifi_flowcontroller_drain_timeout, shutdown_timeout_str)) {
-          shutdown_timeout = core::TimePeriodValue::fromString(shutdown_timeout_str);
-        }
-        if (shutdown_timeout) {
-          return std::chrono::milliseconds{shutdown_timeout->getMilliseconds()};
-        }
-        return std::chrono::milliseconds{0};
-      };
-      std::size_t count;
-      while ((std::chrono::steady_clock::now() - shutdown_start) < shutdown_timeout() && (count = this->root_->getTotalFlowFileCount()) != 0) {
+      auto shutdown_start = std::chrono::steady_clock::now();
+      while ((std::chrono::steady_clock::now() - shutdown_start) < loadShutdownTimeoutFromConfiguration().value_or(std::chrono::milliseconds{0}) &&
+          this->root_->getTotalFlowFileCount() != 0) {
         std::this_thread::sleep_for(shutdown_check_interval_);
       }
       // shutdown all other processors as well
@@ -316,7 +313,7 @@
 void FlowController::unload() {
   std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
   if (running_) {
-    stop(true);
+    stop();
   }
   if (initialized_) {
     logger_->log_info("Unload Flow Controller");
@@ -328,7 +325,7 @@
 void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, bool reload) {
   std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
   if (running_) {
-    stop(true);
+    stop();
   }
   if (!initialized_) {
     if (root) {
@@ -342,13 +339,9 @@
     }
 
     this->root_ = root == nullptr ? std::shared_ptr<core::ProcessGroup>(flow_configuration_->getRoot(configuration_filename_)) : root;
-
     logger_->log_info("Loaded root processor Group");
-
     logger_->log_info("Initializing timers");
-
     controller_service_provider_ = flow_configuration_->getControllerServiceProvider();
-
     auto base_shared_ptr = std::dynamic_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this());
 
     if (!thread_pool_.isRunning() || reload) {
@@ -357,7 +350,6 @@
       thread_pool_.setControllerServiceProvider(base_shared_ptr);
       thread_pool_.start();
     }
-
     if (nullptr == timer_scheduler_ || reload) {
       timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
     }
@@ -447,7 +439,7 @@
     if (c2_enabled_ && class_str.empty()) {
       logger_->log_error("Class name must be defined when C2 is enabled");
       std::cerr << "Class name must be defined when C2 is enabled" << std::endl;
-      stop(true);
+      stop();
       exit(1);
     }
   } else {
@@ -485,19 +477,15 @@
   std::string class_csv;
   if (root_ != nullptr) {
     std::shared_ptr<state::response::QueueMetrics> queueMetrics = std::make_shared<state::response::QueueMetrics>();
-
     std::map<std::string, std::shared_ptr<Connection>> connections;
     root_->getConnections(connections);
     for (auto con : connections) {
       queueMetrics->addConnection(con.second);
     }
     device_information_[queueMetrics->getName()] = queueMetrics;
-
     std::shared_ptr<state::response::RepositoryMetrics> repoMetrics = std::make_shared<state::response::RepositoryMetrics>();
-
     repoMetrics->addRepository(provenance_repo_);
     repoMetrics->addRepository(flow_file_repo_);
-
     device_information_[repoMetrics->getName()] = repoMetrics;
   }
 
@@ -506,29 +494,22 @@
 
     for (std::string clazz : classes) {
       auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
-
       if (nullptr == ptr) {
         logger_->log_error("No metric defined for %s", clazz);
         continue;
       }
-
       std::shared_ptr<state::response::ResponseNode> processor = std::static_pointer_cast<state::response::ResponseNode>(ptr);
-
       auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(processor);
-
       if (identifier != nullptr) {
         identifier->setIdentifier(identifier_str);
-
         identifier->setAgentClass(class_str);
       }
-
       auto monitor = std::dynamic_pointer_cast<state::response::AgentMonitor>(processor);
       if (monitor != nullptr) {
         monitor->addRepository(provenance_repo_);
         monitor->addRepository(flow_file_repo_);
         monitor->setStateMonitor(shared_from_this());
       }
-
       auto flowMonitor = std::dynamic_pointer_cast<state::response::FlowMonitor>(processor);
       std::map<std::string, std::shared_ptr<Connection>> connections;
       root_->getConnections(connections);
@@ -537,31 +518,23 @@
           flowMonitor->addConnection(con.second);
         }
         flowMonitor->setStateMonitor(shared_from_this());
-
         flowMonitor->setFlowVersion(flow_configuration_->getFlowVersion());
       }
-
       std::lock_guard<std::mutex> lock(metrics_mutex_);
-
       root_response_nodes_[processor->getName()] = processor;
     }
   }
 
   if (configuration_->get("nifi.flow.metrics.classes", class_csv)) {
     std::vector<std::string> classes = utils::StringUtils::split(class_csv, ",");
-
     for (std::string clazz : classes) {
       auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
-
       if (nullptr == ptr) {
         logger_->log_error("No metric defined for %s", clazz);
         continue;
       }
-
       std::shared_ptr<state::response::ResponseNode> processor = std::static_pointer_cast<state::response::ResponseNode>(ptr);
-
       std::lock_guard<std::mutex> lock(metrics_mutex_);
-
       device_information_[processor->getName()] = processor;
     }
   }
@@ -646,16 +619,13 @@
 
         if (configuration_->get(nameOption.str(), name)) {
           std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name);
-
           if (configuration_->get(classOption.str(), class_definitions)) {
             std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ",");
-
             for (std::string clazz : classes) {
               std::lock_guard<std::mutex> lock(metrics_mutex_);
 
               // instantiate the object
               auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
-
               if (nullptr == ptr) {
                 auto metric = component_metrics_.find(clazz);
                 if (metric != component_metrics_.end()) {
@@ -665,9 +635,7 @@
                   continue;
                 }
               }
-
               auto node = std::dynamic_pointer_cast<state::response::ResponseNode>(ptr);
-
               std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(node);
             }
 
@@ -675,8 +643,6 @@
             std::stringstream optionName;
             optionName << option.str() << "." << name;
             auto node = loadC2ResponseConfiguration(optionName.str(), new_node);
-//            if (node != nullptr && new_node != node)
-            //            std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(node);
           }
 
           root_response_nodes_[name] = new_node;
@@ -718,13 +684,10 @@
           } else {
             if (configuration_->get(classOption.str(), class_definitions)) {
               std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ",");
-
               for (std::string clazz : classes) {
                 std::lock_guard<std::mutex> lock(metrics_mutex_);
-
                 // instantiate the object
                 auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
-
                 if (nullptr == ptr) {
                   auto metric = component_metrics_.find(clazz);
                   if (metric != component_metrics_.end()) {
@@ -736,7 +699,6 @@
                 }
 
                 auto node = std::dynamic_pointer_cast<state::response::ResponseNode>(ptr);
-
                 std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(node);
               }
               if (!new_node->isEmpty())
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 9b46323..0ea31d2 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -446,7 +446,7 @@
             logger_->log_debug("Clearing state for component %s", component->getComponentName());
             auto state_manager = state_manager_provider->getCoreComponentStateManager(component->getComponentUUID());
             if (state_manager != nullptr) {
-              component->stop(true);
+              component->stop();
               state_manager->clear();
               state_manager->persist();
               component->start();
@@ -473,7 +473,7 @@
       handle_describe(resp);
       break;
     case Operation::RESTART: {
-      update_sink_->stop(true);
+      update_sink_->stop();
       C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
       protocol_.load()->consumePayload(std::move(response));
       restart_agent();
@@ -490,7 +490,7 @@
       for (auto &component : components) {
         logger_->log_debug("Stopping component %s", component->getComponentName());
         if (resp.op == Operation::STOP)
-          component->stop(true);
+          component->stop();
         else
           component->start();
       }
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
index ab8e364..9b7162d 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -103,7 +103,7 @@
           int size = stream->readUTF(componentStr);
           if ( size != -1 ) {
             auto components = update_sink_->getComponents(componentStr);
-            for (auto component : components) {
+            for (const auto& component : components) {
               component->start();
             }
           } else {
@@ -117,8 +117,8 @@
           int size = stream->readUTF(componentStr);
           if ( size != -1 ) {
             auto components = update_sink_->getComponents(componentStr);
-            for (auto component : components) {
-              component->stop(true, 1000);
+            for (const auto& component : components) {
+              component->stop();
             }
           } else {
             logger_->log_debug("Connection broke");
diff --git a/libminifi/src/core/state/ProcessorController.cpp b/libminifi/src/core/state/ProcessorController.cpp
index a6a6e80..edc766a 100644
--- a/libminifi/src/core/state/ProcessorController.cpp
+++ b/libminifi/src/core/state/ProcessorController.cpp
@@ -42,7 +42,7 @@
 /**
  * Stop the client
  */
-int16_t ProcessorController::stop(bool force, uint64_t timeToWait) {
+int16_t ProcessorController::stop() {
   scheduler_->unschedule(processor_);
   return 0;
 }
diff --git a/libminifi/test/flow-tests/FlowControllerTests.cpp b/libminifi/test/flow-tests/FlowControllerTests.cpp
index 731e742..d3b08af 100644
--- a/libminifi/test/flow-tests/FlowControllerTests.cpp
+++ b/libminifi/test/flow-tests/FlowControllerTests.cpp
@@ -116,7 +116,7 @@
     REQUIRE(it.second->getQueueSize() > 10);
   }
 
-  controller->stop(true);
+  controller->stop();
 
   REQUIRE(sinkProc->trigger_count == 0);
 
@@ -150,7 +150,7 @@
   REQUIRE(sourceProc->trigger_count.load() == 1);
 
   execSinkPromise.set_value();
-  controller->stop(true);
+  controller->stop();
 
   REQUIRE(sourceProc->trigger_count.load() == 1);
   REQUIRE(sinkProc->trigger_count.load() == 3);
@@ -187,7 +187,7 @@
   REQUIRE(sourceProc->trigger_count.load() == 1);
 
   execSinkPromise.set_value();
-  controller->stop(true);
+  controller->stop();
 
   REQUIRE(sourceProc->trigger_count.load() == 1);
   REQUIRE(sinkProc->trigger_count.load() == 1);
@@ -227,7 +227,7 @@
 
   std::thread shutdownThread([&]{
     execSinkPromise.set_value();
-    controller->stop(true);
+    controller->stop();
   });
 
   auto shutdownInitiated = std::chrono::steady_clock::now();
diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h b/libminifi/test/flow-tests/TestControllerWithFlow.h
index 25149de..25cd151 100644
--- a/libminifi/test/flow-tests/TestControllerWithFlow.h
+++ b/libminifi/test/flow-tests/TestControllerWithFlow.h
@@ -62,7 +62,7 @@
   }
 
   ~TestControllerWithFlow() {
-    controller_->stop(true);
+    controller_->stop();
     controller_->unload();
     LogTestController::getInstance().reset();
   }
diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp
index c5b3962..188f515 100644
--- a/libminifi/test/unit/ControllerTests.cpp
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -52,7 +52,7 @@
   /**
    * Stop the client
    */
-  virtual int16_t stop(bool force, uint64_t timeToWait = 0) {
+  virtual int16_t stop() {
     is_running = false;
     return 0;
   }
@@ -105,7 +105,7 @@
   /**
    * Stop the client
    */
-  virtual int16_t stop(bool force, uint64_t timeToWait = 0) {
+  virtual int16_t stop() {
     is_running = false;
     return 0;
   }
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index cce7bf6..96283d9 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -254,12 +254,12 @@
     return 0;
   }
 
-  int16_t stop(bool force, uint64_t timeToWait = 0) override {
+  int16_t stop() override {
     running_.store(false);
     return 0;
   }
   void waitUnload(const uint64_t timeToWaitMs) override {
-    stop(true);
+    stop();
   }
 
   int16_t pause() override {
@@ -267,7 +267,7 @@
   }
 
   void unload() override {
-    stop(true);
+    stop();
   }
 
   bool isRunning() override {