blob: 9ed88a3c3bd24af79546a545eff9c59ddcf0c58c [file] [log] [blame]
/**
* @file FlowController.h
* FlowController class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_FLOWCONTROLLER_H_
#define LIBMINIFI_INCLUDE_FLOWCONTROLLER_H_
#include <algorithm>
#include <atomic>
#include <map>
#include <memory>
#include <mutex>
#include <cstdio>
#include <queue>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "Connection.h"
#include "core/controller/ControllerServiceNode.h"
#include "core/controller/ControllerServiceProvider.h"
#include "core/controller/ForwardingControllerServiceProvider.h"
#include "core/FlowConfiguration.h"
#include "core/logging/Logger.h"
#include "core/ProcessContext.h"
#include "core/ProcessGroup.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "core/Property.h"
#include "core/Relationship.h"
#include "core/state/nodes/FlowInformation.h"
#include "core/state/nodes/MetricsBase.h"
#include "core/state/UpdateController.h"
#include "c2/C2Client.h"
#include "CronDrivenSchedulingAgent.h"
#include "EventDrivenSchedulingAgent.h"
#include "FlowControlProtocol.h"
#include "FlowFileRecord.h"
#include "properties/Configure.h"
#include "TimerDrivenSchedulingAgent.h"
#include "utils/Id.h"
#include "utils/file/FileSystem.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
// Default NiFi Root Group Name
#define DEFAULT_ROOT_GROUP_NAME ""
/**
* Flow Controller class. Generally used by FlowController factory
* as a singleton.
*/
class FlowController : public core::controller::ForwardingControllerServiceProvider, public state::StateMonitor, public c2::C2Client, public std::enable_shared_from_this<FlowController> {
public:
FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo,
std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration,
std::shared_ptr<core::ContentRepository> content_repo, std::string name = DEFAULT_ROOT_GROUP_NAME,
bool headless_mode = false, std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>());
FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo,
std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration,
std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<utils::file::FileSystem> filesystem)
: FlowController(std::move(provenance_repo), std::move(flow_file_repo), std::move(configure), std::move(flow_configuration),
std::move(content_repo), DEFAULT_ROOT_GROUP_NAME, false, std::move(filesystem)) {}
~FlowController() override;
// Get the provenance repository
virtual std::shared_ptr<core::Repository> getProvenanceRepository() {
return this->provenance_repo_;
}
// Get the flowfile repository
virtual std::shared_ptr<core::Repository> getFlowFileRepository() {
return this->flow_file_repo_;
}
// Load flow xml from disk, after that, create the root process group and its children, initialize the flows
virtual void load(const std::shared_ptr<core::ProcessGroup> &root = nullptr, bool reload = false);
// Whether the Flow Controller is start running
bool isRunning() override {
return running_.load() || updating_.load();
}
// Whether the Flow Controller has already been initialized (loaded flow XML)
virtual bool isInitialized() {
return initialized_.load();
}
// Start to run the Flow Controller which internally start the root process group and all its children
int16_t start() override;
int16_t pause() override {
return -1;
}
// Unload the current flow YAML, clean the root process group and all its children
int16_t stop() override;
int16_t applyUpdate(const std::string &source, const std::string &configuration, bool persist) override;
int16_t drainRepositories() override {
return -1;
}
std::vector<std::shared_ptr<state::StateController>> getComponents(const std::string &name) override;
std::vector<std::shared_ptr<state::StateController>> getAllComponents() override;
int16_t clearConnection(const std::string &connection) override;
int16_t applyUpdate(const std::string& /*source*/, const std::shared_ptr<state::Update>&) override { return -1; }
// Asynchronous function trigger unloading and wait for a period of time
virtual void waitUnload(uint64_t timeToWaitMs);
// Unload the current flow xml, clean the root process group and all its children
virtual void unload();
// update property value
void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
if (root_ != nullptr)
root_->updatePropertyValue(std::move(processorName), std::move(propertyName), std::move(propertyValue));
}
// set SerialNumber
void setSerialNumber(std::string number) {
serial_number_ = std::move(number);
}
// get serial number as string
std::string getSerialNumber() {
return serial_number_;
}
// validate and apply passing yaml configuration payload
// first it will validate the payload with the current root node config for flowController
// like FlowController id/name is the same and new version is greater than the current version
// after that, it will apply the configuration
bool applyConfiguration(const std::string &source, const std::string &configurePayload);
// get name
std::string getName() const override {
if (root_ != nullptr)
return root_->getName();
else
return "";
}
std::string getComponentName() const override {
return "FlowController";
}
utils::Identifier getComponentUUID() const override {
if (!root_) {
return {};
}
return root_->getUUID();
}
// get version
virtual std::string getVersion() {
if (root_ != nullptr)
return std::to_string(root_->getVersion());
else
return "0";
}
utils::Identifier getControllerUUID() const override {
return getUUID();
}
/**
* Retrieves the agent manifest to be sent as a response to C2 DESCRIBE manifest
* @return the agent manifest response node
*/
std::shared_ptr<state::response::ResponseNode> getAgentManifest() const override;
uint64_t getUptime() override;
std::vector<BackTrace> getTraces() override;
private:
/**
* Loads the flow as specified in the flow config file or if not present
* tries to fetch it from the C2 server (if enabled).
* @return the built flow
*/
std::unique_ptr<core::ProcessGroup> loadInitialFlow();
protected:
// function to load the flow file repo.
void loadFlowRepo();
void initializeExternalComponents();
utils::optional<std::chrono::milliseconds> loadShutdownTimeoutFromConfiguration();
private:
template <typename T, typename = typename std::enable_if<std::is_base_of<SchedulingAgent, T>::value>::type>
void conditionalReloadScheduler(std::shared_ptr<T>& scheduler, const bool condition) {
if (condition) {
scheduler = std::make_shared<T>(gsl::not_null<core::controller::ControllerServiceProvider*>(this), provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
}
}
protected:
// flow controller mutex
std::recursive_mutex mutex_;
// Whether it is running
std::atomic<bool> running_;
std::atomic<bool> updating_;
// Whether it has already been initialized (load the flow XML already)
std::atomic<bool> initialized_;
// Thread pool for schedulers
utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_;
// Flow Timer Scheduler
std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;
// Flow Event Scheduler
std::shared_ptr<EventDrivenSchedulingAgent> event_scheduler_;
// Cron Schedule
std::shared_ptr<CronDrivenSchedulingAgent> cron_scheduler_;
// FlowControl Protocol
std::unique_ptr<FlowControlProtocol> protocol_;
// metrics information
std::chrono::steady_clock::time_point start_time_;
private:
std::chrono::milliseconds shutdown_check_interval_{1000};
std::shared_ptr<logging::Logger> logger_;
std::string serial_number_;
};
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org
#endif // LIBMINIFI_INCLUDE_FLOWCONTROLLER_H_