| /** |
| * @file FlowController.h |
| * FlowController class declaration |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #pragma once |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <functional> |
| #include <map> |
| #include <memory> |
| #include <mutex> |
| #include <cstdio> |
| #include <queue> |
| #include <set> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "Connection.h" |
| #include "core/controller/ControllerServiceNode.h" |
| #include "core/controller/ControllerServiceProvider.h" |
| #include "core/controller/ForwardingControllerServiceProvider.h" |
| #include "core/FlowConfiguration.h" |
| #include "core/logging/Logger.h" |
| #include "core/ProcessContext.h" |
| #include "core/ProcessGroup.h" |
| #include "core/Processor.h" |
| #include "core/ProcessSession.h" |
| #include "core/Property.h" |
| #include "core/Relationship.h" |
| #include "core/state/nodes/FlowInformation.h" |
| #include "core/state/nodes/MetricsBase.h" |
| #include "core/state/UpdateController.h" |
| #include "c2/C2Agent.h" |
| #include "CronDrivenSchedulingAgent.h" |
| #include "EventDrivenSchedulingAgent.h" |
| #include "FlowFileRecord.h" |
| #include "properties/Configure.h" |
| #include "TimerDrivenSchedulingAgent.h" |
| #include "utils/Id.h" |
| #include "utils/file/FileSystem.h" |
| #include "core/state/nodes/ResponseNodeLoader.h" |
| #include "core/state/MetricsPublisher.h" |
| #include "core/state/MetricsPublisherStore.h" |
| #include "RootProcessGroupWrapper.h" |
| #include "c2/ControllerSocketProtocol.h" |
| |
| namespace org::apache::nifi::minifi { |
| |
| namespace state { |
| class ProcessorController; |
| } // namespace state |
| |
| class FlowController : public core::controller::ForwardingControllerServiceProvider, public state::StateMonitor { |
| public: |
| FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, |
| std::shared_ptr<Configure> configure, std::shared_ptr<core::FlowConfiguration> flow_configuration, |
| std::shared_ptr<core::ContentRepository> content_repo, std::unique_ptr<state::MetricsPublisherStore> metrics_publisher_store = nullptr, |
| std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>(), std::function<void()> request_restart = []{}); |
| |
| ~FlowController() override; |
| |
| virtual std::shared_ptr<core::Repository> getProvenanceRepository() { |
| return this->provenance_repo_; |
| } |
| |
| virtual void load(bool reload = false); |
| |
| void load(std::unique_ptr<core::ProcessGroup> root, bool reload = false) { |
| root_wrapper_.setNewRoot(std::move(root)); |
| load(reload); |
| } |
| |
| bool isRunning() const override { |
| return running_.load() || updating_.isUpdating(); |
| } |
| |
| virtual bool isInitialized() { |
| return initialized_.load(); |
| } |
| // Start the Flow Controller which internally starts the root process group and all its children |
| int16_t start() override; |
| int16_t pause() override; |
| int16_t resume() override; |
| // Unload the current flow, clean the root process group and all its children |
| int16_t stop() override; |
| int16_t applyUpdate(const std::string &source, const std::string &configuration, bool persist, const std::optional<std::string>& flow_id) override; |
| int16_t drainRepositories() override { |
| return -1; |
| } |
| |
| void executeOnComponent(const std::string& id_or_name, std::function<void(state::StateController&)> func) override; |
| void executeOnAllComponents(std::function<void(state::StateController&)> func) override; |
| |
| int16_t clearConnection(const std::string &connection) override; |
| |
| std::vector<std::string> getSupportedConfigurationFormats() const override; |
| |
| int16_t applyUpdate(const std::string& /*source*/, const std::shared_ptr<state::Update>&) override { return -1; } |
| // Asynchronous function trigger unloading and wait for a period of time |
| virtual void waitUnload(const std::chrono::milliseconds time_to_wait); |
| void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) { |
| root_wrapper_.updatePropertyValue(std::move(processorName), std::move(propertyName), std::move(propertyValue)); |
| } |
| |
| // validate and apply passing configuration payload |
| // first it will validate the payload with the current root node config for flowController |
| // like FlowController id/name is the same and new version is greater than the current version |
| // after that, it will apply the configuration |
| bool applyConfiguration(const std::string &source, const std::string &configurePayload, const std::optional<std::string>& flow_id = std::nullopt); |
| |
| std::string getName() const override { |
| return root_wrapper_.getName(); |
| } |
| |
| std::string getComponentName() const override { |
| return "FlowController"; |
| } |
| |
| utils::Identifier getComponentUUID() const override { |
| return root_wrapper_.getComponentUUID(); |
| } |
| |
| virtual std::string getVersion() { |
| return root_wrapper_.getVersion(); |
| } |
| |
| uint64_t getUptime() override; |
| |
| std::vector<BackTrace> getTraces() override; |
| |
| std::map<std::string, std::unique_ptr<io::InputStream>> getDebugInfo() override; |
| |
| private: |
| class UpdateState { |
| public: |
| bool isUpdating() const { return update_block_count_ > 0; } |
| void beginUpdate() { ++update_block_count_; } |
| void endUpdate() { --update_block_count_; } |
| void lock() { beginUpdate(); } |
| void unlock() { endUpdate(); } |
| |
| private: |
| std::atomic<uint32_t> update_block_count_; |
| }; |
| |
| /** |
| * Loads the flow as specified in the flow config file or if not present |
| * tries to fetch it from the C2 server (if enabled). |
| * @return the built flow |
| */ |
| std::unique_ptr<core::ProcessGroup> loadInitialFlow(); |
| |
| void loadFlowRepo(); |
| std::vector<state::StateController*> getAllComponents(); |
| state::StateController* getComponent(const std::string& id_or_name); |
| gsl::not_null<std::unique_ptr<state::ProcessorController>> createController(core::Processor& processor); |
| std::unique_ptr<core::ProcessGroup> updateFromPayload(const std::string& url, const std::string& config_payload, const std::optional<std::string>& flow_id = std::nullopt); |
| |
| template <typename T, typename = typename std::enable_if<std::is_base_of<SchedulingAgent, T>::value>::type> |
| void conditionalReloadScheduler(std::unique_ptr<T>& scheduler, const bool condition) { |
| if (condition) { |
| scheduler = std::make_unique<T>(gsl::not_null<core::controller::ControllerServiceProvider*>(this), provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_); |
| } |
| } |
| |
| std::recursive_mutex mutex_; |
| std::atomic<bool> running_; |
| UpdateState updating_; |
| std::atomic<bool> initialized_; |
| std::unique_ptr<TimerDrivenSchedulingAgent> timer_scheduler_; |
| std::unique_ptr<EventDrivenSchedulingAgent> event_scheduler_; |
| std::unique_ptr<CronDrivenSchedulingAgent> cron_scheduler_; |
| std::chrono::steady_clock::time_point start_time_; |
| // Thread pool for schedulers |
| utils::ThreadPool thread_pool_; |
| std::shared_ptr<Configure> configuration_; |
| std::shared_ptr<core::Repository> provenance_repo_; |
| std::shared_ptr<core::Repository> flow_file_repo_; |
| std::shared_ptr<core::ContentRepository> content_repo_; |
| std::shared_ptr<core::FlowConfiguration> flow_configuration_; |
| std::unique_ptr<state::MetricsPublisherStore> metrics_publisher_store_; |
| RootProcessGroupWrapper root_wrapper_; |
| std::unique_ptr<c2::C2Agent> c2_agent_{}; |
| std::unique_ptr<c2::ControllerSocketProtocol> controller_socket_protocol_; |
| std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<FlowController>::getLogger(); |
| }; |
| |
| } // namespace org::apache::nifi::minifi |