blob: bb98c373174a853bd22738788c79eedc4e21372f [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_CORE_PROCESSGROUP_H_
#define LIBMINIFI_INCLUDE_CORE_PROCESSGROUP_H_
#include <memory>
#include <string>
#include <vector>
#include <queue>
#include <map>
#include <mutex>
#include <atomic>
#include <algorithm>
#include <set>
#include "Processor.h"
#include "Exception.h"
#include "TimerDrivenSchedulingAgent.h"
#include "EventDrivenSchedulingAgent.h"
#include "CronDrivenSchedulingAgent.h"
#include "core/logging/Logger.h"
#include "controller/ControllerServiceNode.h"
#include "controller/ControllerServiceMap.h"
#include "utils/Id.h"
#include "utils/HTTPClient.h"
#include "utils/CallBackTimer.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
// Process Group Type
enum ProcessGroupType {
ROOT_PROCESS_GROUP = 0,
REMOTE_PROCESS_GROUP,
MAX_PROCESS_GROUP_TYPE
};
#define ONSCHEDULE_RETRY_INTERVAL 30000 // millisecs
// ProcessGroup Class
class ProcessGroup : public CoreComponent {
public:
// Constructor
/*!
* Create a new process group
*/
ProcessGroup(ProcessGroupType type, const std::string& name, const utils::Identifier& uuid, int version, ProcessGroup *parent);
ProcessGroup(ProcessGroupType type, const std::string& name);
ProcessGroup(ProcessGroupType type, const std::string& name, const utils::Identifier& uuid);
ProcessGroup(ProcessGroupType type, const std::string& name, const utils::Identifier& uuid, int version);
// Destructor
virtual ~ProcessGroup();
// Set URL
void setURL(std::string url) {
url_ = url;
}
// Get URL
std::string getURL(void) {
return (url_);
}
// SetTransmitting
void setTransmitting(bool val) {
transmitting_ = val;
}
// Get Transmitting
bool getTransmitting() {
return transmitting_;
}
// setTimeOut
void setTimeOut(uint64_t time) {
timeOut_ = time;
}
uint64_t getTimeOut() {
return timeOut_;
}
// setInterface
void setInterface(std::string &ifc) {
local_network_interface_ = ifc;
}
std::string getInterface() {
return local_network_interface_;
}
void setTransportProtocol(std::string &protocol) {
transport_protocol_ = protocol;
}
std::string getTransportProtocol() {
return transport_protocol_;
}
void setHttpProxyHost(std::string &host) {
proxy_.host = host;
}
std::string getHttpProxyHost() {
return proxy_.host;
}
void setHttpProxyUserName(std::string &username) {
proxy_.username = username;
}
std::string getHttpProxyUserName() {
return proxy_.username;
}
void setHttpProxyPassWord(std::string &password) {
proxy_.password = password;
}
std::string getHttpProxyPassWord() {
return proxy_.password;
}
void setHttpProxyPort(int port) {
proxy_.port = port;
}
int getHttpProxyPort() {
return proxy_.port;
}
utils::HTTPProxy getHTTPProxy() {
return proxy_;
}
// Set Processor yield period in MilliSecond
void setYieldPeriodMsec(uint64_t period) {
yield_period_msec_ = period;
}
// Get Processor yield period in MilliSecond
uint64_t getYieldPeriodMsec(void) {
return (yield_period_msec_);
}
void setOnScheduleRetryPeriod(int64_t period) {
onschedule_retry_msec_ = period;
}
int64_t getOnScheduleRetryPeriod() {
return onschedule_retry_msec_;
}
// getVersion
int getVersion() {
return config_version_;
}
// Start Processing
void startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); // NOLINT
// Stop Processing
void stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler, const std::function<bool(const std::shared_ptr<Processor>&)>& filter = [] (const std::shared_ptr<Processor>&) {return true;}); // NOLINT
// Whether it is root process group
bool isRootProcessGroup();
// set parent process group
void setParent(ProcessGroup *parent) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
parent_process_group_ = parent;
}
// get parent process group
ProcessGroup *getParent(void) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
return parent_process_group_;
}
// Add processor
void addProcessor(const std::shared_ptr<Processor>& processor);
// Remove processor
void removeProcessor(const std::shared_ptr<Processor>& processor);
// Add child processor group
void addProcessGroup(ProcessGroup *child);
// Remove child processor group
void removeProcessGroup(ProcessGroup *child);
// ! Add connections
void addConnection(const std::shared_ptr<Connection>& connection);
// Generic find
template <typename Fun>
std::shared_ptr<Processor> findProcessor(Fun condition) const {
std::lock_guard<std::recursive_mutex> lock(mutex_);
const auto found = std::find_if(processors_.cbegin(), processors_.cend(), condition);
if (found != processors_.cend()) {
return *found;
}
for (const auto& processGroup : child_process_groups_) {
const std::shared_ptr<Processor> processor = processGroup->findProcessor(condition);
if (processor) {
return processor;
}
}
return nullptr;
}
// findProcessor based on UUID
std::shared_ptr<Processor> findProcessorById(const utils::Identifier& uuid) const;
// findProcessor based on name
std::shared_ptr<Processor> findProcessorByName(const std::string &processorName) const;
void getAllProcessors(std::vector<std::shared_ptr<Processor>> &processor_vec);
/**
* Add controller service
* @param nodeId node identifier
* @param node controller service node.
*/
void addControllerService(const std::string &nodeId, std::shared_ptr<core::controller::ControllerServiceNode> &node);
/**
* Find controllerservice node will search child groups until the nodeId is found.
* @param node node identifier
* @return controller service node, if it exists.
*/
std::shared_ptr<core::controller::ControllerServiceNode> findControllerService(const std::string &nodeId);
// removeConnection
void removeConnection(const std::shared_ptr<Connection>& connection);
// update property value
void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue);
void getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap);
void getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap);
void getFlowFileContainers(std::map<std::string, std::shared_ptr<Connectable>> &containers) const;
void drainConnections();
std::size_t getTotalFlowFileCount() const;
protected:
void startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); // NOLINT
// version
int config_version_;
// Process Group Type
ProcessGroupType type_;
// Processors (ProcessNode) inside this process group which include Input/Output Port, Remote Process Group input/Output port
std::set<std::shared_ptr<Processor> > processors_;
std::set<std::shared_ptr<Processor> > failed_processors_;
std::set<ProcessGroup *> child_process_groups_;
// Connections between the processor inside the group;
std::set<std::shared_ptr<Connection> > connections_;
// Parent Process Group
ProcessGroup* parent_process_group_;
// Yield Period in Milliseconds
std::atomic<uint64_t> yield_period_msec_;
std::atomic<uint64_t> timeOut_;
std::atomic<int64_t> onschedule_retry_msec_;
// URL
std::string url_;
// local network interface
std::string local_network_interface_;
// Transmitting
std::atomic<bool> transmitting_;
// http proxy
utils::HTTPProxy proxy_;
std::string transport_protocol_;
// controller services
core::controller::ControllerServiceMap controller_service_map_;
private:
// Mutex for protection
mutable std::recursive_mutex mutex_;
// Logger
std::shared_ptr<logging::Logger> logger_;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProcessGroup(const ProcessGroup &parent);
ProcessGroup &operator=(const ProcessGroup &parent);
static std::shared_ptr<utils::IdGenerator> id_generator_;
std::unique_ptr<utils::CallBackTimer> onScheduleTimer_;
};
} // namespace core
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org
#endif // LIBMINIFI_INCLUDE_CORE_PROCESSGROUP_H_