blob: a40b00411514216ab6fb7380a5d4915c02586007 [file] [log] [blame]
/**
* @file FlowController.cpp
* FlowController class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <map>
#include <chrono>
#include <future>
#include <thread>
#include <utility>
#include <memory>
#include <string>
#include "FlowController.h"
#include "core/state/ProcessorController.h"
#include "core/ProcessGroup.h"
#include "core/Core.h"
#include "SchedulingAgent.h"
#include "core/controller/ControllerServiceProvider.h"
#include "core/controller/ForwardingControllerServiceProvider.h"
#include "core/logging/LoggerConfiguration.h"
#include "core/Connectable.h"
#include "utils/file/PathUtils.h"
#include "utils/file/FileSystem.h"
#include "utils/BaseHTTPClient.h"
#include "io/NetworkPrioritizer.h"
#include "io/FileStream.h"
#include "core/ClassLoader.h"
#include "core/ThreadedRepository.h"
#include "c2/C2MetricsPublisher.h"
namespace org::apache::nifi::minifi {
FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo,
std::shared_ptr<Configure> configure, std::shared_ptr<core::FlowConfiguration> flow_configuration,
std::shared_ptr<core::ContentRepository> content_repo, std::unique_ptr<state::MetricsPublisherStore> metrics_publisher_store,
std::shared_ptr<utils::file::FileSystem> filesystem, std::function<void()> request_restart)
: core::controller::ForwardingControllerServiceProvider(core::getClassName<FlowController>()),
running_(false),
updating_(false),
initialized_(false),
thread_pool_(5, false, nullptr, "Flowcontroller threadpool"),
configuration_(std::move(configure)),
provenance_repo_(std::move(provenance_repo)),
flow_file_repo_(std::move(flow_file_repo)),
content_repo_(std::move(content_repo)),
flow_configuration_(std::move(flow_configuration)),
metrics_publisher_store_(std::move(metrics_publisher_store)),
root_wrapper_(configuration_, metrics_publisher_store_.get()) {
if (provenance_repo_ == nullptr)
throw std::runtime_error("Provenance Repo should not be null");
if (flow_file_repo_ == nullptr)
throw std::runtime_error("Flow Repo should not be null");
if (configuration_ == nullptr) {
throw std::runtime_error("Must supply a configuration.");
}
if (flow_configuration_) {
controller_service_provider_impl_ = flow_configuration_->getControllerServiceProvider();
}
protocol_ = std::make_unique<FlowControlProtocol>(this, configuration_);
if (metrics_publisher_store_) {
metrics_publisher_store_->initialize(this, this);
}
if (c2::isC2Enabled(configuration_)) {
std::shared_ptr<c2::C2MetricsPublisher> c2_metrics_publisher;
if (auto publisher = metrics_publisher_store_->getMetricsPublisher(c2::C2_METRICS_PUBLISHER).lock()) {
c2_metrics_publisher = std::dynamic_pointer_cast<c2::C2MetricsPublisher>(publisher);
}
c2_agent_ = std::make_unique<c2::C2Agent>(configuration_, c2_metrics_publisher, std::move(filesystem), std::move(request_restart));
}
}
FlowController::~FlowController() {
if (c2_agent_) {
c2_agent_->stop();
}
stop();
// TODO(adebreceni): are these here on purpose, so they are destroyed first?
protocol_ = nullptr;
flow_file_repo_ = nullptr;
provenance_repo_ = nullptr;
logger_->log_trace("Destroying FlowController");
}
bool FlowController::applyConfiguration(const std::string &source, const std::string &configurePayload, const std::optional<std::string>& flow_id) {
std::unique_ptr<core::ProcessGroup> newRoot;
try {
newRoot = updateFromPayload(source, configurePayload, flow_id);
} catch (const std::exception& ex) {
logger_->log_error("Invalid configuration payload, type: %s, what: %s", typeid(ex).name(), ex.what());
return false;
} catch (...) {
logger_->log_error("Invalid configuration payload, type: %s", getCurrentExceptionTypeName());
return false;
}
if (newRoot == nullptr)
return false;
if (!isRunning())
return false;
logger_->log_info("Starting to reload Flow Controller with flow control name %s, version %d", newRoot->getName(), newRoot->getVersion());
updating_ = true;
bool started = false;
{
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
stop();
root_wrapper_.setNewRoot(std::move(newRoot));
initialized_ = false;
try {
load(true);
started = start() == 0;
} catch (const std::exception& ex) {
logger_->log_error("Caught exception while starting flow, type %s, what: %s", typeid(ex).name(), ex.what());
} catch (...) {
logger_->log_error("Caught unknown exception while starting flow, type %s", getCurrentExceptionTypeName());
}
if (!started) {
logger_->log_error("Failed to start new flow, restarting previous flow");
root_wrapper_.restoreBackup();
load(true);
start();
} else {
root_wrapper_.clearBackup();
}
}
updating_ = false;
if (started) {
auto flowVersion = flow_configuration_->getFlowVersion();
if (flowVersion) {
logger_->log_debug("Setting flow id to %s", flowVersion->getFlowId());
logger_->log_debug("Setting flow url to %s", flowVersion->getFlowIdentifier()->getRegistryUrl());
configuration_->set(Configure::nifi_c2_flow_id, flowVersion->getFlowId());
configuration_->set(Configure::nifi_c2_flow_url, flowVersion->getFlowIdentifier()->getRegistryUrl());
} else {
logger_->log_debug("Invalid flow version, not setting");
}
}
return started;
}
int16_t FlowController::stop() {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
if (running_) {
// immediately indicate that we are not running
logger_->log_info("Stop Flow Controller");
root_wrapper_.stopProcessing(*timer_scheduler_, *event_scheduler_, *cron_scheduler_);
// stop after we've attempted to stop the processors.
timer_scheduler_->stop();
event_scheduler_->stop();
cron_scheduler_->stop();
thread_pool_.shutdown();
/* STOP! Before you change it, consider the following:
* -Stopping the schedulers doesn't actually quit the onTrigger functions of processors
* -They only guarantee that the processors are not scheduled anymore
* -After the threadpool is stopped we can make sure that processors don't need repos and controllers anymore */
root_wrapper_.drainConnections();
this->flow_file_repo_->stop();
this->provenance_repo_->stop();
this->content_repo_->stop();
// stop the ControllerServices
disableAllControllerServices();
initialized_ = false;
running_ = false;
}
return 0;
}
/**
* This function will attempt to unload yaml and stop running Processors.
*
* If the latter attempt fails or does not complete within the prescribed
* period, running_ will be set to false and we will return.
*
* @param timeToWaitMs Maximum time to wait before manually
* marking running as false.
*/
void FlowController::waitUnload(const uint64_t timeToWaitMs) {
if (running_) {
// use the current time and increment with the provided argument.
std::chrono::system_clock::time_point wait_time = std::chrono::system_clock::now() + std::chrono::milliseconds(timeToWaitMs);
// create an asynchronous future.
std::future<void> unload_task = std::async(std::launch::async, [this]() {stop();});
if (std::future_status::ready == unload_task.wait_until(wait_time)) {
running_ = false;
}
}
}
std::unique_ptr<core::ProcessGroup> FlowController::loadInitialFlow() {
std::unique_ptr<core::ProcessGroup> root = flow_configuration_->getRoot();
if (root) {
return root;
}
logger_->log_error("Couldn't load flow configuration file, trying to fetch it from C2 server");
auto opt_flow_url = configuration_->get(Configure::nifi_c2_flow_url);
if (!opt_flow_url) {
logger_->log_error("No flow configuration url found");
return nullptr;
}
// ensure that C2 connection is up and running
// since we don't have access to the flow definition, the C2 communication
// won't be able to use the services defined there, e.g. SSLContextService
if (!c2_agent_) {
return nullptr;
}
c2_agent_->initialize(this, this, this);
c2_agent_->start();
auto opt_source = c2_agent_->fetchFlow(*opt_flow_url);
if (!opt_source) {
logger_->log_error("Couldn't fetch flow configuration from C2 server");
return nullptr;
}
root = updateFromPayload(*opt_flow_url, *opt_source);
if (root) {
logger_->log_info("Successfully fetched valid flow configuration");
if (!flow_configuration_->persist(*opt_source)) {
logger_->log_info("Failed to write the fetched flow to disk");
}
}
return root;
}
void FlowController::load(bool reload) {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
if (running_) {
stop();
}
if (reload) {
io::NetworkPrioritizerFactory::getInstance()->clearPrioritizer();
}
if (!root_wrapper_.initialized()) {
logger_->log_info("Instantiating new flow");
root_wrapper_.setNewRoot(loadInitialFlow());
if (c2_agent_ && !c2_agent_->isControllerRunning()) {
// TODO(lordgamez): this initialization configures the C2 sender protocol (e.g. RESTSender) which may contain an SSL Context service from the flow config
// for SSL communication. This service may change on flow update and we should take care of the SSL Context Service change in the C2 Agent.
c2_agent_->initialize(this, this, this);
c2_agent_->start();
}
}
logger_->log_info("Loaded root processor Group");
logger_->log_info("Initializing timers");
if (!thread_pool_.isRunning() || reload) {
thread_pool_.shutdown();
thread_pool_.setMaxConcurrentTasks(configuration_->getInt(Configure::nifi_flow_engine_threads, 5));
thread_pool_.setControllerServiceProvider(this);
thread_pool_.start();
}
conditionalReloadScheduler<TimerDrivenSchedulingAgent>(timer_scheduler_, !timer_scheduler_ || reload);
conditionalReloadScheduler<EventDrivenSchedulingAgent>(event_scheduler_, !event_scheduler_ || reload);
conditionalReloadScheduler<CronDrivenSchedulingAgent>(cron_scheduler_, !cron_scheduler_ || reload);
logger_->log_info("Loaded controller service provider");
/*
* Without reset we have to distinguish a fresh restart and a reload, to decide if we have to
* increment the claims' counter on behalf of the persisted instances.
* ResourceClaim::getStreamCount is not suitable as multiple persisted instances
* might have the same claim.
* e.g. without reset a streamCount of 3 could mean the following:
* - it was a fresh restart and 3 instances of this claim have already been resurrected -> we must increment
* - it was a reload and 3 instances have been persisted before the shutdown -> we must not increment
*/
content_repo_->reset();
logger_->log_info("Reset content repository");
// Load Flow File from Repo
loadFlowRepo();
logger_->log_info("Loaded flow repository");
initialized_ = true;
}
void FlowController::loadFlowRepo() {
if (this->flow_file_repo_ != nullptr) {
logger_->log_debug("Getting connection map");
std::map<std::string, core::Connectable*> connectionMap;
std::map<std::string, core::Connectable*> containers;
root_wrapper_.getConnections(connectionMap);
root_wrapper_.getFlowFileContainers(containers);
flow_file_repo_->setConnectionMap(connectionMap);
flow_file_repo_->setContainers(containers);
flow_file_repo_->loadComponent(content_repo_);
} else {
logger_->log_debug("Flow file repository is not set");
}
}
int16_t FlowController::start() {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
if (!initialized_) {
logger_->log_error("Can not start Flow Controller because it has not been initialized");
return -1;
} else if (!running_) {
logger_->log_info("Starting Flow Controller");
enableAllControllerServices();
timer_scheduler_->start();
event_scheduler_->start();
cron_scheduler_->start();
// watch out, this might immediately start the processors
// as the thread_pool_ is started in load()
if (root_wrapper_.startProcessing(*timer_scheduler_, *event_scheduler_, *cron_scheduler_)) {
start_time_ = std::chrono::steady_clock::now();
}
core::logging::LoggerConfiguration::getConfiguration().initializeAlertSinks(this, configuration_);
running_ = true;
protocol_->start();
content_repo_->start();
provenance_repo_->start();
flow_file_repo_->start();
thread_pool_.start();
logger_->log_info("Started Flow Controller");
}
return 0;
}
int16_t FlowController::pause() {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
if (!running_) {
logger_->log_warn("Can not pause flow controller that is not running");
return 0;
}
logger_->log_info("Pausing Flow Controller");
thread_pool_.pause();
return 0;
}
int16_t FlowController::resume() {
std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
if (!running_) {
logger_->log_warn("Can not resume flow controller tasks because the flow controller is not running");
return 0;
}
logger_->log_info("Resuming Flow Controller");
thread_pool_.resume();
return 0;
}
std::vector<std::string> FlowController::getSupportedConfigurationFormats() const {
return flow_configuration_->getSupportedFormats();
}
int16_t FlowController::applyUpdate(const std::string &source, const std::string &configuration, bool persist, const std::optional<std::string>& flow_id) {
if (applyConfiguration(source, configuration, flow_id)) {
if (persist) {
flow_configuration_->persist(configuration);
}
return 0;
} else {
return -1;
}
}
int16_t FlowController::clearConnection(const std::string &connection) {
root_wrapper_.clearConnection(connection);
return -1;
}
void FlowController::executeOnAllComponents(std::function<void(state::StateController&)> func) {
if (updating_ || !initialized_) {
return;
}
std::lock_guard<std::recursive_mutex> lock(mutex_);
for (auto* component: getAllComponents()) {
func(*component);
}
}
void FlowController::executeOnComponent(const std::string &id_or_name, std::function<void(state::StateController&)> func) {
if (updating_ || !initialized_) {
return;
}
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (auto* component = getComponent(id_or_name); component != nullptr) {
func(*component);
} else {
logger_->log_error("Could not get execute requested callback for component \"%s\", because component was not found", id_or_name);
}
}
std::vector<state::StateController*> FlowController::getAllComponents() {
if (auto components = root_wrapper_.getAllProcessorControllers([this](core::Processor& p) { return createController(p); })) {
components->push_back(this);
return *components;
}
return {this};
}
state::StateController* FlowController::getComponent(const std::string& id_or_name) {
if (id_or_name == getUUIDStr() || id_or_name == "FlowController") {
return this;
} else if (auto controller = root_wrapper_.getProcessorController(id_or_name, [this](core::Processor& p) { return createController(p); })) {
return controller;
}
return nullptr;
}
gsl::not_null<std::unique_ptr<state::ProcessorController>> FlowController::createController(core::Processor& processor) {
const auto scheduler = [this, &processor]() -> SchedulingAgent& {
switch (processor.getSchedulingStrategy()) {
case core::SchedulingStrategy::TIMER_DRIVEN: return *timer_scheduler_;
case core::SchedulingStrategy::EVENT_DRIVEN: return *event_scheduler_;
case core::SchedulingStrategy::CRON_DRIVEN: return *cron_scheduler_;
}
gsl_Assert(false);
};
return gsl::make_not_null(std::make_unique<state::ProcessorController>(processor, scheduler()));
}
uint64_t FlowController::getUptime() {
auto now = std::chrono::steady_clock::now();
auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time_).count();
return time_since;
}
std::vector<BackTrace> FlowController::getTraces() {
std::vector<BackTrace> traces{thread_pool_.getTraces()};
if (auto provenance_repo = std::dynamic_pointer_cast<core::ThreadedRepository>(provenance_repo_)) {
auto prov_repo_trace = provenance_repo->getTraces();
traces.emplace_back(std::move(prov_repo_trace));
}
if (auto flow_file_repo = std::dynamic_pointer_cast<core::ThreadedRepository>(flow_file_repo_)) {
auto flow_repo_trace = flow_file_repo->getTraces();
traces.emplace_back(std::move(flow_repo_trace));
}
auto my_traces = TraceResolver::getResolver().getBackTrace("main");
traces.emplace_back(std::move(my_traces));
return traces;
}
std::map<std::string, std::unique_ptr<io::InputStream>> FlowController::getDebugInfo() {
std::map<std::string, std::unique_ptr<io::InputStream>> debug_info;
if (auto logs = core::logging::LoggerConfiguration::getCompressedLog(true)) {
debug_info["minifi.log.gz"] = std::move(logs);
}
if (auto opt_flow_path = flow_configuration_->getConfigurationPath()) {
debug_info["config.yml"] = std::make_unique<io::FileStream>(opt_flow_path.value(), 0, false);
}
debug_info["minifi.properties"] = std::make_unique<io::FileStream>(configuration_->getFilePath(), 0, false);
return debug_info;
}
std::unique_ptr<core::ProcessGroup> FlowController::updateFromPayload(const std::string& url, const std::string& config_payload, const std::optional<std::string>& flow_id) {
auto root = flow_configuration_->updateFromPayload(url, config_payload, flow_id);
// prepare to accept the new controller service provider from flow_configuration_
clearControllerServices();
controller_service_provider_impl_ = flow_configuration_->getControllerServiceProvider();
return root;
}
} // namespace org::apache::nifi::minifi