MINIFICPP-1288 - Extract FlowController::loadShutdownTimeoutFromConfiguration()
Simplify timeout logic in FlowController::stop()
Remove unused arguments from stop methods and set default waiting time for FlowController::stop to zero
Clean FlowController from redundant whitespaces, comments and unused member fields
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #856
diff --git a/controller/Controller.h b/controller/Controller.h
index 92d863c..726d3b6 100644
--- a/controller/Controller.h
+++ b/controller/Controller.h
@@ -331,7 +331,7 @@
controller->load();
controller->start();
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
- controller->stop(true);
+ controller->stop();
}
#endif /* CONTROLLER_CONTROLLER_H_ */
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index af06f57..c40c573 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -117,7 +117,7 @@
return -1;
}
// Unload the current flow YAML, clean the root process group and all its children
- int16_t stop(bool force, uint64_t timeToWait = 0) override;
+ int16_t stop() override;
int16_t applyUpdate(const std::string &source, const std::string &configuration) override;
int16_t drainRepositories() override {
return -1;
@@ -319,19 +319,15 @@
std::vector<BackTrace> getTraces() override;
void initializeC2();
-
void stopC2();
protected:
void loadC2ResponseConfiguration();
-
void loadC2ResponseConfiguration(const std::string &prefix);
-
std::shared_ptr<state::response::ResponseNode> loadC2ResponseConfiguration(const std::string &prefix, std::shared_ptr<state::response::ResponseNode>);
// function to load the flow file repo.
void loadFlowRepo();
-
void initializeExternalComponents();
/**
@@ -339,13 +335,11 @@
*/
virtual void initializePaths(const std::string &adjustedFilename);
+ utils::optional<std::chrono::milliseconds> loadShutdownTimeoutFromConfiguration();
+
// flow controller mutex
std::recursive_mutex mutex_;
- // Configuration File Name
- std::string configuration_file_name_;
- // NiFi property File Name
- std::string properties_file_name_;
// Root Process Group
std::shared_ptr<core::ProcessGroup> root_;
// Whether it is running
@@ -354,7 +348,6 @@
// conifiguration filename
std::string configuration_filename_;
-
std::atomic<bool> c2_initialized_;
std::atomic<bool> flow_update_;
std::atomic<bool> c2_enabled_;
@@ -362,61 +355,41 @@
std::atomic<bool> initialized_;
// Provenance Repo
std::shared_ptr<core::Repository> provenance_repo_;
-
// FlowFile Repo
std::shared_ptr<core::Repository> flow_file_repo_;
-
std::shared_ptr<core::ContentRepository> content_repo_;
-
// Thread pool for schedulers
utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_;
- // Flow Engines
// Flow Timer Scheduler
std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
// Flow Event Scheduler
std::shared_ptr<EventDrivenSchedulingAgent> event_scheduler_;
// Cron Schedule
std::shared_ptr<CronDrivenSchedulingAgent> cron_scheduler_;
- // Controller Service
- // Config
- // Site to Site Server Listener
- // Heart Beat
// FlowControl Protocol
std::unique_ptr<FlowControlProtocol> protocol_;
-
std::shared_ptr<Configure> configuration_;
-
std::shared_ptr<core::controller::ControllerServiceMap> controller_service_map_;
-
std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_;
// flow configuration object.
std::unique_ptr<core::FlowConfiguration> flow_configuration_;
-
// metrics information
-
std::chrono::steady_clock::time_point start_time_;
-
mutable std::mutex metrics_mutex_;
// root_nodes cache
std::map<std::string, std::shared_ptr<state::response::ResponseNode>> root_response_nodes_;
-
// metrics cache
std::map<std::string, std::shared_ptr<state::response::ResponseNode>> device_information_;
-
// metrics cache
std::map<std::string, std::shared_ptr<state::response::ResponseNode>> component_metrics_;
-
std::map<uint8_t, std::vector<std::shared_ptr<state::response::ResponseNode>>> component_metrics_by_id_;
-
// metrics last run
std::chrono::steady_clock::time_point last_metrics_capture_;
private:
std::chrono::milliseconds shutdown_check_interval_{1000};
-
std::shared_ptr<logging::Logger> logger_;
std::string serial_number_;
-
std::unique_ptr<state::UpdateController> c2_agent_;
};
diff --git a/libminifi/include/core/state/ProcessorController.h b/libminifi/include/core/state/ProcessorController.h
index 3085839..33a7e30 100644
--- a/libminifi/include/core/state/ProcessorController.h
+++ b/libminifi/include/core/state/ProcessorController.h
@@ -60,7 +60,7 @@
/**
* Stop the client
*/
- virtual int16_t stop(bool force, uint64_t timeToWait = 0);
+ virtual int16_t stop();
virtual bool isRunning();
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index d2729a7..ad3f5d3 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -162,7 +162,7 @@
/**
* Stop the client
*/
- virtual int16_t stop(bool force, uint64_t timeToWait = 0) = 0;
+ virtual int16_t stop() = 0;
virtual bool isRunning() = 0;
diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp
index 4dafd24..966b0dd 100644
--- a/libminifi/src/FlowControlProtocol.cpp
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -363,7 +363,7 @@
return 0;
} else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) {
logger_->log_trace("Flow Control Protocol stop flow controller");
- this->_controller->stop(true);
+ this->_controller->stop();
this->_seqNumber++;
utils::file::FileUtils::close(_socket);
_socket = 0;
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index cc67fdd..e088628 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -169,8 +169,19 @@
}
}
+utils::optional<std::chrono::milliseconds> FlowController::loadShutdownTimeoutFromConfiguration() {
+ std::string shutdown_timeout_str;
+ if (configuration_->get(minifi::Configure::nifi_flowcontroller_drain_timeout, shutdown_timeout_str)) {
+ const utils::optional<core::TimePeriodValue> time_from_config = core::TimePeriodValue::fromString(shutdown_timeout_str);
+ if (time_from_config) {
+ return { std::chrono::milliseconds{ time_from_config.value().getMilliseconds() }};
+ }
+ }
+ return utils::nullopt;
+}
+
FlowController::~FlowController() {
- stop(true);
+ stop();
stopC2();
unload();
protocol_ = nullptr;
@@ -200,7 +211,7 @@
updating_ = true;
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
- stop(true);
+ stop();
unload();
controller_map_->clear();
auto prevRoot = std::move(this->root_);
@@ -234,7 +245,7 @@
return started;
}
-int16_t FlowController::stop(bool force, uint64_t timeToWait) {
+int16_t FlowController::stop() {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
if (running_) {
// immediately indicate that we are not running
@@ -244,26 +255,12 @@
this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_, [] (const std::shared_ptr<core::Processor>& proc) -> bool {
return !proc->hasIncomingConnections();
});
- auto shutdown_start = std::chrono::steady_clock::now();
// we enable C2 to progressively increase the timeout
// in case it sees that waiting for a little longer could
// allow the FlowFiles to be processed
- auto shutdown_timeout = [&]() -> std::chrono::milliseconds {
- if (timeToWait != 0) {
- return std::chrono::milliseconds{timeToWait};
- }
- utils::optional<core::TimePeriodValue> shutdown_timeout;
- std::string shutdown_timeout_str;
- if (configuration_->get(minifi::Configure::nifi_flowcontroller_drain_timeout, shutdown_timeout_str)) {
- shutdown_timeout = core::TimePeriodValue::fromString(shutdown_timeout_str);
- }
- if (shutdown_timeout) {
- return std::chrono::milliseconds{shutdown_timeout->getMilliseconds()};
- }
- return std::chrono::milliseconds{0};
- };
- std::size_t count;
- while ((std::chrono::steady_clock::now() - shutdown_start) < shutdown_timeout() && (count = this->root_->getTotalFlowFileCount()) != 0) {
+ auto shutdown_start = std::chrono::steady_clock::now();
+ while ((std::chrono::steady_clock::now() - shutdown_start) < loadShutdownTimeoutFromConfiguration().value_or(std::chrono::milliseconds{0}) &&
+ this->root_->getTotalFlowFileCount() != 0) {
std::this_thread::sleep_for(shutdown_check_interval_);
}
// shutdown all other processors as well
@@ -316,7 +313,7 @@
void FlowController::unload() {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
if (running_) {
- stop(true);
+ stop();
}
if (initialized_) {
logger_->log_info("Unload Flow Controller");
@@ -328,7 +325,7 @@
void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, bool reload) {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
if (running_) {
- stop(true);
+ stop();
}
if (!initialized_) {
if (root) {
@@ -342,13 +339,9 @@
}
this->root_ = root == nullptr ? std::shared_ptr<core::ProcessGroup>(flow_configuration_->getRoot(configuration_filename_)) : root;
-
logger_->log_info("Loaded root processor Group");
-
logger_->log_info("Initializing timers");
-
controller_service_provider_ = flow_configuration_->getControllerServiceProvider();
-
auto base_shared_ptr = std::dynamic_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this());
if (!thread_pool_.isRunning() || reload) {
@@ -357,7 +350,6 @@
thread_pool_.setControllerServiceProvider(base_shared_ptr);
thread_pool_.start();
}
-
if (nullptr == timer_scheduler_ || reload) {
timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
}
@@ -447,7 +439,7 @@
if (c2_enabled_ && class_str.empty()) {
logger_->log_error("Class name must be defined when C2 is enabled");
std::cerr << "Class name must be defined when C2 is enabled" << std::endl;
- stop(true);
+ stop();
exit(1);
}
} else {
@@ -485,19 +477,15 @@
std::string class_csv;
if (root_ != nullptr) {
std::shared_ptr<state::response::QueueMetrics> queueMetrics = std::make_shared<state::response::QueueMetrics>();
-
std::map<std::string, std::shared_ptr<Connection>> connections;
root_->getConnections(connections);
for (auto con : connections) {
queueMetrics->addConnection(con.second);
}
device_information_[queueMetrics->getName()] = queueMetrics;
-
std::shared_ptr<state::response::RepositoryMetrics> repoMetrics = std::make_shared<state::response::RepositoryMetrics>();
-
repoMetrics->addRepository(provenance_repo_);
repoMetrics->addRepository(flow_file_repo_);
-
device_information_[repoMetrics->getName()] = repoMetrics;
}
@@ -506,29 +494,22 @@
for (std::string clazz : classes) {
auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
-
if (nullptr == ptr) {
logger_->log_error("No metric defined for %s", clazz);
continue;
}
-
std::shared_ptr<state::response::ResponseNode> processor = std::static_pointer_cast<state::response::ResponseNode>(ptr);
-
auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(processor);
-
if (identifier != nullptr) {
identifier->setIdentifier(identifier_str);
-
identifier->setAgentClass(class_str);
}
-
auto monitor = std::dynamic_pointer_cast<state::response::AgentMonitor>(processor);
if (monitor != nullptr) {
monitor->addRepository(provenance_repo_);
monitor->addRepository(flow_file_repo_);
monitor->setStateMonitor(shared_from_this());
}
-
auto flowMonitor = std::dynamic_pointer_cast<state::response::FlowMonitor>(processor);
std::map<std::string, std::shared_ptr<Connection>> connections;
root_->getConnections(connections);
@@ -537,31 +518,23 @@
flowMonitor->addConnection(con.second);
}
flowMonitor->setStateMonitor(shared_from_this());
-
flowMonitor->setFlowVersion(flow_configuration_->getFlowVersion());
}
-
std::lock_guard<std::mutex> lock(metrics_mutex_);
-
root_response_nodes_[processor->getName()] = processor;
}
}
if (configuration_->get("nifi.flow.metrics.classes", class_csv)) {
std::vector<std::string> classes = utils::StringUtils::split(class_csv, ",");
-
for (std::string clazz : classes) {
auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
-
if (nullptr == ptr) {
logger_->log_error("No metric defined for %s", clazz);
continue;
}
-
std::shared_ptr<state::response::ResponseNode> processor = std::static_pointer_cast<state::response::ResponseNode>(ptr);
-
std::lock_guard<std::mutex> lock(metrics_mutex_);
-
device_information_[processor->getName()] = processor;
}
}
@@ -646,16 +619,13 @@
if (configuration_->get(nameOption.str(), name)) {
std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name);
-
if (configuration_->get(classOption.str(), class_definitions)) {
std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ",");
-
for (std::string clazz : classes) {
std::lock_guard<std::mutex> lock(metrics_mutex_);
// instantiate the object
auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
-
if (nullptr == ptr) {
auto metric = component_metrics_.find(clazz);
if (metric != component_metrics_.end()) {
@@ -665,9 +635,7 @@
continue;
}
}
-
auto node = std::dynamic_pointer_cast<state::response::ResponseNode>(ptr);
-
std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(node);
}
@@ -675,8 +643,6 @@
std::stringstream optionName;
optionName << option.str() << "." << name;
auto node = loadC2ResponseConfiguration(optionName.str(), new_node);
-// if (node != nullptr && new_node != node)
- // std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(node);
}
root_response_nodes_[name] = new_node;
@@ -718,13 +684,10 @@
} else {
if (configuration_->get(classOption.str(), class_definitions)) {
std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ",");
-
for (std::string clazz : classes) {
std::lock_guard<std::mutex> lock(metrics_mutex_);
-
// instantiate the object
auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(clazz, clazz);
-
if (nullptr == ptr) {
auto metric = component_metrics_.find(clazz);
if (metric != component_metrics_.end()) {
@@ -736,7 +699,6 @@
}
auto node = std::dynamic_pointer_cast<state::response::ResponseNode>(ptr);
-
std::static_pointer_cast<state::response::ObjectNode>(new_node)->add_node(node);
}
if (!new_node->isEmpty())
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 9b46323..0ea31d2 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -446,7 +446,7 @@
logger_->log_debug("Clearing state for component %s", component->getComponentName());
auto state_manager = state_manager_provider->getCoreComponentStateManager(component->getComponentUUID());
if (state_manager != nullptr) {
- component->stop(true);
+ component->stop();
state_manager->clear();
state_manager->persist();
component->start();
@@ -473,7 +473,7 @@
handle_describe(resp);
break;
case Operation::RESTART: {
- update_sink_->stop(true);
+ update_sink_->stop();
C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
protocol_.load()->consumePayload(std::move(response));
restart_agent();
@@ -490,7 +490,7 @@
for (auto &component : components) {
logger_->log_debug("Stopping component %s", component->getComponentName());
if (resp.op == Operation::STOP)
- component->stop(true);
+ component->stop();
else
component->start();
}
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
index ab8e364..9b7162d 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -103,7 +103,7 @@
int size = stream->readUTF(componentStr);
if ( size != -1 ) {
auto components = update_sink_->getComponents(componentStr);
- for (auto component : components) {
+ for (const auto& component : components) {
component->start();
}
} else {
@@ -117,8 +117,8 @@
int size = stream->readUTF(componentStr);
if ( size != -1 ) {
auto components = update_sink_->getComponents(componentStr);
- for (auto component : components) {
- component->stop(true, 1000);
+ for (const auto& component : components) {
+ component->stop();
}
} else {
logger_->log_debug("Connection broke");
diff --git a/libminifi/src/core/state/ProcessorController.cpp b/libminifi/src/core/state/ProcessorController.cpp
index a6a6e80..edc766a 100644
--- a/libminifi/src/core/state/ProcessorController.cpp
+++ b/libminifi/src/core/state/ProcessorController.cpp
@@ -42,7 +42,7 @@
/**
* Stop the client
*/
-int16_t ProcessorController::stop(bool force, uint64_t timeToWait) {
+int16_t ProcessorController::stop() {
scheduler_->unschedule(processor_);
return 0;
}
diff --git a/libminifi/test/flow-tests/FlowControllerTests.cpp b/libminifi/test/flow-tests/FlowControllerTests.cpp
index 731e742..d3b08af 100644
--- a/libminifi/test/flow-tests/FlowControllerTests.cpp
+++ b/libminifi/test/flow-tests/FlowControllerTests.cpp
@@ -116,7 +116,7 @@
REQUIRE(it.second->getQueueSize() > 10);
}
- controller->stop(true);
+ controller->stop();
REQUIRE(sinkProc->trigger_count == 0);
@@ -150,7 +150,7 @@
REQUIRE(sourceProc->trigger_count.load() == 1);
execSinkPromise.set_value();
- controller->stop(true);
+ controller->stop();
REQUIRE(sourceProc->trigger_count.load() == 1);
REQUIRE(sinkProc->trigger_count.load() == 3);
@@ -187,7 +187,7 @@
REQUIRE(sourceProc->trigger_count.load() == 1);
execSinkPromise.set_value();
- controller->stop(true);
+ controller->stop();
REQUIRE(sourceProc->trigger_count.load() == 1);
REQUIRE(sinkProc->trigger_count.load() == 1);
@@ -227,7 +227,7 @@
std::thread shutdownThread([&]{
execSinkPromise.set_value();
- controller->stop(true);
+ controller->stop();
});
auto shutdownInitiated = std::chrono::steady_clock::now();
diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h b/libminifi/test/flow-tests/TestControllerWithFlow.h
index 25149de..25cd151 100644
--- a/libminifi/test/flow-tests/TestControllerWithFlow.h
+++ b/libminifi/test/flow-tests/TestControllerWithFlow.h
@@ -62,7 +62,7 @@
}
~TestControllerWithFlow() {
- controller_->stop(true);
+ controller_->stop();
controller_->unload();
LogTestController::getInstance().reset();
}
diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp
index c5b3962..188f515 100644
--- a/libminifi/test/unit/ControllerTests.cpp
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -52,7 +52,7 @@
/**
* Stop the client
*/
- virtual int16_t stop(bool force, uint64_t timeToWait = 0) {
+ virtual int16_t stop() {
is_running = false;
return 0;
}
@@ -105,7 +105,7 @@
/**
* Stop the client
*/
- virtual int16_t stop(bool force, uint64_t timeToWait = 0) {
+ virtual int16_t stop() {
is_running = false;
return 0;
}
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index cce7bf6..96283d9 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -254,12 +254,12 @@
return 0;
}
- int16_t stop(bool force, uint64_t timeToWait = 0) override {
+ int16_t stop() override {
running_.store(false);
return 0;
}
void waitUnload(const uint64_t timeToWaitMs) override {
- stop(true);
+ stop();
}
int16_t pause() override {
@@ -267,7 +267,7 @@
}
void unload() override {
- stop(true);
+ stop();
}
bool isRunning() override {