blob: 6f14f04463aed83e11d22104cc1a287f853b9b0d [file] [log] [blame]
/**
* @file FlowController.h
* FlowController class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <atomic>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <cstdio>
#include <queue>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "Connection.h"
#include "core/controller/ControllerServiceNode.h"
#include "core/controller/ControllerServiceProvider.h"
#include "core/controller/ForwardingControllerServiceProvider.h"
#include "core/FlowConfiguration.h"
#include "core/logging/Logger.h"
#include "core/ProcessContext.h"
#include "core/ProcessGroup.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "core/Property.h"
#include "core/Relationship.h"
#include "core/state/nodes/FlowInformation.h"
#include "core/state/nodes/MetricsBase.h"
#include "core/state/UpdateController.h"
#include "c2/C2Agent.h"
#include "CronDrivenSchedulingAgent.h"
#include "EventDrivenSchedulingAgent.h"
#include "FlowFileRecord.h"
#include "properties/Configure.h"
#include "TimerDrivenSchedulingAgent.h"
#include "utils/Id.h"
#include "utils/file/FileSystem.h"
#include "core/state/nodes/ResponseNodeLoader.h"
#include "core/state/MetricsPublisher.h"
#include "core/state/MetricsPublisherStore.h"
#include "RootProcessGroupWrapper.h"
#include "c2/ControllerSocketProtocol.h"
namespace org::apache::nifi::minifi {
namespace state {
class ProcessorController;
} // namespace state
class FlowController : public core::controller::ForwardingControllerServiceProvider, public state::StateMonitor {
public:
FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo,
std::shared_ptr<Configure> configure, std::shared_ptr<core::FlowConfiguration> flow_configuration,
std::shared_ptr<core::ContentRepository> content_repo, std::unique_ptr<state::MetricsPublisherStore> metrics_publisher_store = nullptr,
std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>(), std::function<void()> request_restart = []{});
~FlowController() override;
virtual std::shared_ptr<core::Repository> getProvenanceRepository() {
return this->provenance_repo_;
}
virtual void load(bool reload = false);
void load(std::unique_ptr<core::ProcessGroup> root, bool reload = false) {
root_wrapper_.setNewRoot(std::move(root));
load(reload);
}
bool isRunning() const override {
return running_.load() || updating_.isUpdating();
}
virtual bool isInitialized() {
return initialized_.load();
}
// Start the Flow Controller which internally starts the root process group and all its children
int16_t start() override;
int16_t pause() override;
int16_t resume() override;
// Unload the current flow, clean the root process group and all its children
int16_t stop() override;
int16_t applyUpdate(const std::string &source, const std::string &configuration, bool persist, const std::optional<std::string>& flow_id) override;
int16_t drainRepositories() override {
return -1;
}
void executeOnComponent(const std::string& id_or_name, std::function<void(state::StateController&)> func) override;
void executeOnAllComponents(std::function<void(state::StateController&)> func) override;
int16_t clearConnection(const std::string &connection) override;
std::vector<std::string> getSupportedConfigurationFormats() const override;
int16_t applyUpdate(const std::string& /*source*/, const std::shared_ptr<state::Update>&) override { return -1; }
// Asynchronous function trigger unloading and wait for a period of time
virtual void waitUnload(const std::chrono::milliseconds time_to_wait);
void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
root_wrapper_.updatePropertyValue(std::move(processorName), std::move(propertyName), std::move(propertyValue));
}
// validate and apply passing configuration payload
// first it will validate the payload with the current root node config for flowController
// like FlowController id/name is the same and new version is greater than the current version
// after that, it will apply the configuration
bool applyConfiguration(const std::string &source, const std::string &configurePayload, const std::optional<std::string>& flow_id = std::nullopt);
std::string getName() const override {
return root_wrapper_.getName();
}
std::string getComponentName() const override {
return "FlowController";
}
utils::Identifier getComponentUUID() const override {
return root_wrapper_.getComponentUUID();
}
virtual std::string getVersion() {
return root_wrapper_.getVersion();
}
uint64_t getUptime() override;
std::vector<BackTrace> getTraces() override;
std::map<std::string, std::unique_ptr<io::InputStream>> getDebugInfo() override;
private:
class UpdateState {
public:
bool isUpdating() const { return update_block_count_ > 0; }
void beginUpdate() { ++update_block_count_; }
void endUpdate() { --update_block_count_; }
void lock() { beginUpdate(); }
void unlock() { endUpdate(); }
private:
std::atomic<uint32_t> update_block_count_;
};
/**
* Loads the flow as specified in the flow config file or if not present
* tries to fetch it from the C2 server (if enabled).
* @return the built flow
*/
std::unique_ptr<core::ProcessGroup> loadInitialFlow();
void loadFlowRepo();
std::vector<state::StateController*> getAllComponents();
state::StateController* getComponent(const std::string& id_or_name);
gsl::not_null<std::unique_ptr<state::ProcessorController>> createController(core::Processor& processor);
std::unique_ptr<core::ProcessGroup> updateFromPayload(const std::string& url, const std::string& config_payload, const std::optional<std::string>& flow_id = std::nullopt);
template <typename T, typename = typename std::enable_if<std::is_base_of<SchedulingAgent, T>::value>::type>
void conditionalReloadScheduler(std::unique_ptr<T>& scheduler, const bool condition) {
if (condition) {
scheduler = std::make_unique<T>(gsl::not_null<core::controller::ControllerServiceProvider*>(this), provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
}
}
std::recursive_mutex mutex_;
std::atomic<bool> running_;
UpdateState updating_;
std::atomic<bool> initialized_;
std::unique_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
std::unique_ptr<EventDrivenSchedulingAgent> event_scheduler_;
std::unique_ptr<CronDrivenSchedulingAgent> cron_scheduler_;
std::chrono::steady_clock::time_point start_time_;
// Thread pool for schedulers
utils::ThreadPool thread_pool_;
std::shared_ptr<Configure> configuration_;
std::shared_ptr<core::Repository> provenance_repo_;
std::shared_ptr<core::Repository> flow_file_repo_;
std::shared_ptr<core::ContentRepository> content_repo_;
std::shared_ptr<core::FlowConfiguration> flow_configuration_;
std::unique_ptr<state::MetricsPublisherStore> metrics_publisher_store_;
RootProcessGroupWrapper root_wrapper_;
std::unique_ptr<c2::C2Agent> c2_agent_{};
std::unique_ptr<c2::ControllerSocketProtocol> controller_socket_protocol_;
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<FlowController>::getLogger();
};
} // namespace org::apache::nifi::minifi