blob: 96dd684382273f64d9539a535aa2d79d55466dd8 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <string>
#include <vector>
#include <queue>
#include <map>
#include <mutex>
#include <atomic>
#include <algorithm>
#include <set>
#include <utility>
#include <tuple>
#include "Processor.h"
#include "Exception.h"
#include "TimerDrivenSchedulingAgent.h"
#include "EventDrivenSchedulingAgent.h"
#include "CronDrivenSchedulingAgent.h"
#include "Port.h"
#include "core/logging/Logger.h"
#include "controller/ControllerServiceNode.h"
#include "controller/ControllerServiceMap.h"
#include "utils/Id.h"
#include "utils/BaseHTTPClient.h"
#include "utils/CallBackTimer.h"
#include "range/v3/algorithm/find_if.hpp"
struct ProcessGroupTestAccessor;
namespace org::apache::nifi::minifi {
namespace state {
class StateController;
class ProcessorController;
} // namespace state
namespace core {
enum ProcessGroupType {
ROOT_PROCESS_GROUP = 0,
SIMPLE_PROCESS_GROUP,
REMOTE_PROCESS_GROUP,
};
#define ONSCHEDULE_RETRY_INTERVAL 30000 // millisecs
class ProcessGroup : public CoreComponent {
friend struct ::ProcessGroupTestAccessor;
public:
enum class Traverse {
ExcludeChildren,
IncludeChildren
};
ProcessGroup(ProcessGroupType type, std::string name, const utils::Identifier& uuid, int version, ProcessGroup *parent);
ProcessGroup(ProcessGroupType type, std::string name);
ProcessGroup(ProcessGroupType type, std::string name, const utils::Identifier& uuid);
ProcessGroup(ProcessGroupType type, std::string name, const utils::Identifier& uuid, int version);
// Destructor
~ProcessGroup() override;
// Set URL
void setURL(std::string url) {
url_ = std::move(url);
}
// Get URL
std::string getURL() {
return (url_);
}
// SetTransmitting
void setTransmitting(bool val) {
transmitting_ = val;
}
void setTimeout(uint64_t time) {
timeout_ = time;
}
uint64_t getTimeout() {
return timeout_;
}
// setInterface
void setInterface(std::string &ifc) {
local_network_interface_ = ifc;
}
std::string getInterface() {
return local_network_interface_;
}
void setTransportProtocol(std::string &protocol) {
transport_protocol_ = protocol;
}
std::string getTransportProtocol() {
return transport_protocol_;
}
void setHttpProxyHost(std::string &host) {
proxy_.host = host;
}
std::string getHttpProxyHost() const {
return proxy_.host;
}
void setHttpProxyUserName(std::string &username) {
proxy_.username = username;
}
void setHttpProxyPassWord(std::string &password) {
proxy_.password = password;
}
void setHttpProxyPort(int port) {
proxy_.port = port;
}
utils::HTTPProxy getHTTPProxy() {
return proxy_;
}
// Set Processor yield period in MilliSecond
void setYieldPeriodMsec(std::chrono::milliseconds period) {
yield_period_msec_ = period;
}
// Get Processor yield period in MilliSecond
std::chrono::milliseconds getYieldPeriodMsec() {
return (yield_period_msec_);
}
void setOnScheduleRetryPeriod(int64_t period) {
onschedule_retry_msec_ = period;
}
int64_t getOnScheduleRetryPeriod() {
return onschedule_retry_msec_;
}
int getVersion() const {
return config_version_;
}
void startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler,
const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler);
void stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler,
const std::shared_ptr<EventDrivenSchedulingAgent>& eventScheduler,
const std::shared_ptr<CronDrivenSchedulingAgent>& cronScheduler,
const std::function<bool(const Processor*)>& filter = nullptr);
bool isRemoteProcessGroup();
// set parent process group
void setParent(ProcessGroup *parent) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
parent_process_group_ = parent;
}
// get parent process group
ProcessGroup *getParent() {
std::lock_guard<std::recursive_mutex> lock(mutex_);
return parent_process_group_;
}
[[maybe_unused]] std::tuple<Processor*, bool> addProcessor(std::unique_ptr<Processor> processor);
void addPort(std::unique_ptr<Port> port);
void addProcessGroup(std::unique_ptr<ProcessGroup> child);
void addConnection(std::unique_ptr<Connection> connection);
const std::set<Port*>& getPorts() const {
return ports_;
}
Port* findPortById(const utils::Identifier& uuid) const;
Port* findChildPortById(const utils::Identifier& uuid) const;
template <typename Fun>
Processor* findProcessor(Fun condition, Traverse traverse) const {
std::lock_guard<std::recursive_mutex> lock(mutex_);
const auto found = ranges::find_if(processors_, condition);
if (found != ranges::end(processors_)) {
return found->get();
}
for (const auto& processGroup : child_process_groups_) {
if (processGroup->isRemoteProcessGroup() || traverse == Traverse::IncludeChildren) {
const auto processor = processGroup->findProcessor(condition, traverse);
if (processor) {
return processor;
}
}
}
return nullptr;
}
// findProcessor based on UUID
Processor* findProcessorById(const utils::Identifier& uuid, Traverse traverse = Traverse::IncludeChildren) const;
// findProcessor based on name
Processor* findProcessorByName(const std::string &processorName, Traverse traverse = Traverse::IncludeChildren) const;
void getAllProcessors(std::vector<Processor*>& processor_vec) const;
/**
* Add controller service
* @param nodeId node identifier
* @param node controller service node.
*/
void addControllerService(const std::string &nodeId, const std::shared_ptr<core::controller::ControllerServiceNode> &node);
/**
* Find controllerservice node will search child groups until the nodeId is found.
* @param node node identifier
* @return controller service node, if it exists.
*/
std::shared_ptr<core::controller::ControllerServiceNode> findControllerService(const std::string &nodeId);
// update property value
void updatePropertyValue(const std::string& processorName, const std::string& propertyName, const std::string& propertyValue);
void getConnections(std::map<std::string, Connection*>& connectionMap);
void getConnections(std::map<std::string, Connectable*>& connectionMap);
void getFlowFileContainers(std::map<std::string, Connectable*>& containers) const;
void drainConnections();
std::size_t getTotalFlowFileCount() const;
void verify() const;
protected:
void startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); // NOLINT
// version
int config_version_;
// Process Group Type
const ProcessGroupType type_;
// Processors (ProcessNode) inside this process group which include Remote Process Group input/Output port
std::set<std::unique_ptr<Processor>> processors_;
std::set<Processor*> failed_processors_;
std::set<Port*> ports_;
std::set<std::unique_ptr<ProcessGroup>> child_process_groups_;
// Connections between the processor inside the group;
std::set<std::unique_ptr<Connection>> connections_;
// Parent Process Group
ProcessGroup* parent_process_group_;
// Yield Period in Milliseconds
std::atomic<std::chrono::milliseconds> yield_period_msec_;
std::atomic<uint64_t> timeout_;
std::atomic<int64_t> onschedule_retry_msec_;
// URL
std::string url_;
// local network interface
std::string local_network_interface_;
// Transmitting
std::atomic<bool> transmitting_;
// http proxy
utils::HTTPProxy proxy_;
std::string transport_protocol_;
// controller services
core::controller::ControllerServiceMap controller_service_map_;
private:
static Port* findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid);
// Mutex for protection
mutable std::recursive_mutex mutex_;
// Logger
std::shared_ptr<logging::Logger> logger_;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProcessGroup(const ProcessGroup &parent);
ProcessGroup &operator=(const ProcessGroup &parent);
static std::shared_ptr<utils::IdGenerator> id_generator_;
std::unique_ptr<utils::CallBackTimer> onScheduleTimer_;
};
} // namespace core
} // namespace org::apache::nifi::minifi