blob: d8e8ea0667bc729b59199848b71162e01d7c8c21 [file] [log] [blame]
/**
* @file ProcessGroup.cpp
* ProcessGroup class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "core/ProcessGroup.h"
#include <vector>
#include <memory>
#include <string>
#include <map>
#include <set>
#include <chrono>
#include <thread>
#include "core/Processor.h"
#include "core/state/ProcessorController.h"
#include "core/state/UpdateController.h"
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::core {
std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator();
ProcessGroup::ProcessGroup(ProcessGroupType type, std::string_view name, const utils::Identifier& uuid)
: ProcessGroup(type, name, uuid, 0, nullptr) {
}
ProcessGroup::ProcessGroup(ProcessGroupType type, std::string_view name, const utils::Identifier& uuid, int version)
: ProcessGroup(type, name, uuid, version, nullptr) {
}
ProcessGroup::ProcessGroup(ProcessGroupType type, std::string_view name, const utils::Identifier& uuid, int version, ProcessGroup* parent)
: CoreComponentImpl(name, uuid, id_generator_),
config_version_(version),
type_(type),
parent_process_group_(parent),
yield_period_msec_(0ms),
transmitting_(false),
transport_protocol_("RAW"),
logger_(logging::LoggerFactory<ProcessGroup>::getLogger()) {
logger_->log_debug("ProcessGroup {} created", name_);
}
ProcessGroup::ProcessGroup(ProcessGroupType type, std::string_view name)
: CoreComponentImpl(name, {}, id_generator_),
config_version_(0),
type_(type),
parent_process_group_(nullptr),
yield_period_msec_(0ms),
transmitting_(false),
transport_protocol_("RAW"),
logger_(logging::LoggerFactory<ProcessGroup>::getLogger()) {
logger_->log_debug("ProcessGroup {} created", name_);
}
ProcessGroup::~ProcessGroup() {
if (onScheduleTimer_) {
onScheduleTimer_->stop();
}
for (auto&& connection : connections_) {
connection->drain(false);
}
}
bool ProcessGroup::isRemoteProcessGroup() {
return (type_ == REMOTE_PROCESS_GROUP);
}
std::tuple<Processor*, bool> ProcessGroup::addProcessor(std::unique_ptr<Processor> processor) {
gsl_Expects(processor);
const auto name = processor->getName();
std::lock_guard<std::recursive_mutex> lock(mutex_);
processor->setProcessGroupUUIDStr(getUUIDStr());
processor->setProcessGroupName(getName());
processor->setProcessGroupPath(buildGroupPath());
const auto [iter, inserted] = processors_.insert(std::move(processor));
if (inserted) {
logger_->log_debug("Add processor {} into process group {}", name, name_);
} else {
logger_->log_debug("Not adding processor {} into process group {}, as it is already there", name, name_);
}
return std::make_tuple(iter->get(), inserted);
}
void ProcessGroup::addPort(std::unique_ptr<Port> port) {
auto [processor, inserted] = addProcessor(std::move(port));
if (inserted) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
ports_.insert(dynamic_cast<Port*>(processor));
}
}
void ProcessGroup::addProcessGroup(std::unique_ptr<ProcessGroup> child) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (!child_process_groups_.contains(child)) {
// We do not have the same child process group in this process group yet
logger_->log_debug("Add child process group {} into process group {}", child->getName(), name_);
child_process_groups_.emplace(std::move(child));
}
}
void ProcessGroup::startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler,
EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler) {
std::vector<Processor*> processors_to_schedule;
{
std::unique_lock<std::recursive_mutex> lock(mutex_);
for (const auto& processor : failed_processors_) {
processors_to_schedule.push_back(processor);
}
}
std::set<Processor*> failed_processors;
for (const auto processor : processors_to_schedule) {
try {
logger_->log_debug("Starting {}", processor->getName());
processor->setScheduledState(core::ScheduledState::RUNNING);
switch (processor->getSchedulingStrategy()) {
case TIMER_DRIVEN:
timeScheduler.schedule(processor);
break;
case EVENT_DRIVEN:
eventScheduler.schedule(processor);
break;
case CRON_DRIVEN:
cronScheduler.schedule(processor);
break;
}
}
catch (const std::exception &e) {
logger_->log_error("Failed to start processor {} ({}): {}", processor->getUUIDStr(), processor->getName(), e.what());
processor->setScheduledState(core::ScheduledState::STOPPED);
failed_processors.insert(processor);
}
catch (...) {
logger_->log_error("Failed to start processor {} ({})", processor->getUUIDStr(), processor->getName());
processor->setScheduledState(core::ScheduledState::STOPPED);
failed_processors.insert(processor);
}
}
for (const auto processor : failed_processors) {
try {
processor->onUnSchedule();
} catch (const std::exception& ex) {
logger_->log_error("Exception occured during unscheduling processor: {} ({}), type: {}, what: {}", processor->getUUIDStr(), processor->getName(), typeid(ex).name(), ex.what());
} catch (...) {
logger_->log_error("Exception occured during unscheduling processor: {} ({}), type: {}", processor->getUUIDStr(), processor->getName(), getCurrentExceptionTypeName());
}
}
// The admin yield duration comes from the configuration, should be equal in all three schedulers
std::chrono::milliseconds admin_yield_duration = timeScheduler.getAdminYieldDuration();
if (!onScheduleTimer_ && !failed_processors.empty() && admin_yield_duration > 0ms) {
logger_->log_info("Retrying failed processors in {}", admin_yield_duration);
auto func = [this, eventScheduler = &eventScheduler, cronScheduler = &cronScheduler, timeScheduler = &timeScheduler]() {
this->startProcessingProcessors(*timeScheduler, *eventScheduler, *cronScheduler);
};
onScheduleTimer_ = std::make_unique<utils::CallBackTimer>(admin_yield_duration, func);
onScheduleTimer_->start();
} else if (failed_processors.empty() && onScheduleTimer_) {
onScheduleTimer_->stop();
}
{
std::unique_lock<std::recursive_mutex> lock(mutex_);
failed_processors_ = std::move(failed_processors);
}
}
void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler,
CronDrivenSchedulingAgent& cronScheduler) {
try {
// All processors are marked as failed.
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
for (auto& processor : processors_) {
failed_processors_.insert(processor.get());
}
}
// Start all the processor node, input and output ports
startProcessingProcessors(timeScheduler, eventScheduler, cronScheduler);
// Start processing the group
for (auto& processGroup : child_process_groups_) {
processGroup->startProcessing(timeScheduler, eventScheduler, cronScheduler);
}
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception {}", exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during process group start processing, type: {}", getCurrentExceptionTypeName());
throw;
}
}
void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler,
CronDrivenSchedulingAgent& cronScheduler, const std::function<bool(const Processor*)>& filter) {
std::set<Processor*> processors;
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
for (const auto &processor : processors_) {
processors.insert(processor.get());
}
}
if (onScheduleTimer_) {
onScheduleTimer_->stop();
}
onScheduleTimer_.reset();
try {
// Stop all the processor node, input and output ports
for (const auto &processor : processors) {
if (filter && !filter(processor)) {
continue;
}
logger_->log_debug("Stopping {}", processor->getName());
switch (processor->getSchedulingStrategy()) {
case TIMER_DRIVEN:
timeScheduler.unschedule(processor);
break;
case EVENT_DRIVEN:
eventScheduler.unschedule(processor);
break;
case CRON_DRIVEN:
cronScheduler.unschedule(processor);
break;
}
}
for (auto& childGroup : child_process_groups_) {
childGroup->stopProcessing(timeScheduler, eventScheduler, cronScheduler, filter);
}
} catch (const std::exception& exception) {
logger_->log_debug("Caught Exception type: {}, what: {}", typeid(exception).name(), exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during process group stop processing, type: {}", getCurrentExceptionTypeName());
throw;
}
}
Processor* ProcessGroup::findProcessorById(const utils::Identifier& uuid, Traverse traverse) const {
const auto id_matches = [&] (const std::unique_ptr<Processor>& processor) {
logger_->log_trace("Searching for processor by id, checking processor {}", processor->getName());
utils::Identifier processorUUID = processor->getUUID();
return processorUUID && uuid == processorUUID;
};
return findProcessor(id_matches, traverse);
}
Processor* ProcessGroup::findProcessorByName(const std::string &processorName, Traverse traverse) const {
const auto name_matches = [&] (const std::unique_ptr<Processor>& processor) {
logger_->log_trace("Searching for processor by name, checking processor {}", processor->getName());
return processor->getName() == processorName;
};
return findProcessor(name_matches, traverse);
}
void ProcessGroup::addControllerService(const std::string &nodeId, const std::shared_ptr<core::controller::ControllerServiceNode> &node) {
controller_service_map_.put(nodeId, node);
}
core::controller::ControllerServiceNode* ProcessGroup::findControllerService(const std::string &nodeId, Traverse traverse) const {
if (auto result = controller_service_map_.get(nodeId)) {
return result;
}
if (traverse == Traverse::IncludeChildren) {
for (const auto& processGroup : child_process_groups_) {
if (auto controllerService = processGroup->findControllerService(nodeId, traverse)) {
return controllerService;
}
}
}
return nullptr;
}
std::vector<const core::controller::ControllerServiceNode*> ProcessGroup::getAllControllerServices() const {
std::vector<const core::controller::ControllerServiceNode*> controller_service_nodes;
for (const auto& node : controller_service_map_.getAllControllerServices()) {
controller_service_nodes.push_back(node.get());
}
return controller_service_nodes;
}
void ProcessGroup::getAllProcessors(std::vector<Processor*>& processor_vec) const {
std::lock_guard<std::recursive_mutex> lock(mutex_);
for (const auto& processor : processors_) {
logger_->log_trace("Collecting all processors, current processor is {}", processor->getName());
processor_vec.push_back(processor.get());
}
for (const auto& processGroup : child_process_groups_) {
processGroup->getAllProcessors(processor_vec);
}
}
void ProcessGroup::updatePropertyValue(const std::string& processorName, const std::string& propertyName, const std::string& propertyValue) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
for (auto& processor : processors_) {
if (processor->getName() == processorName) {
auto result = processor->setProperty(propertyName, propertyValue);
if (!result) {
logger_->log_error("Failed to update property '{}' for processor '{}': {}", propertyName, processorName, result.error().message());
}
}
}
for (auto& processGroup : child_process_groups_) {
processGroup->updatePropertyValue(processorName, propertyName, propertyValue);
}
}
void ProcessGroup::getConnections(std::map<std::string, Connection*>& connectionMap) {
for (auto& connection : connections_) {
connectionMap[connection->getUUIDStr()] = connection.get();
connectionMap[connection->getName()] = connection.get();
}
for (auto& processGroup : child_process_groups_) {
processGroup->getConnections(connectionMap);
}
}
void ProcessGroup::getConnections(std::map<std::string, Connectable*>& connectionMap) {
for (auto& connection : connections_) {
connectionMap[connection->getUUIDStr()] = connection.get();
connectionMap[connection->getName()] = connection.get();
}
for (auto& processGroup : child_process_groups_) {
processGroup->getConnections(connectionMap);
}
}
void ProcessGroup::getFlowFileContainers(std::map<std::string, Connectable*>& containers) const {
for (auto& connection : connections_) {
containers[connection->getUUIDStr()] = connection.get();
containers[connection->getName()] = connection.get();
}
for (auto& processor : processors_) {
// processors can also own FlowFiles
containers[processor->getUUIDStr()] = processor.get();
}
for (auto& processGroup : child_process_groups_) {
processGroup->getFlowFileContainers(containers);
}
}
Port* ProcessGroup::findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid) {
const auto found = ranges::find_if(ports, [&](auto port) {
utils::Identifier port_uuid = port->getUUID();
return port_uuid && uuid == port_uuid;
});
if (found != ranges::cend(ports)) {
return *found;
}
return nullptr;
}
Port* ProcessGroup::findPortById(const utils::Identifier& uuid) const {
std::lock_guard<std::recursive_mutex> lock(mutex_);
return findPortById(ports_, uuid);
}
Port* ProcessGroup::findChildPortById(const utils::Identifier& uuid) const {
std::lock_guard<std::recursive_mutex> lock(mutex_);
for (const auto& processGroup : child_process_groups_) {
const auto& ports = processGroup->getPorts();
if (auto port = findPortById(ports, uuid)) {
return port;
}
}
return nullptr;
}
void ProcessGroup::addConnection(std::unique_ptr<Connection> connection) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
auto [insertPos, inserted] = connections_.insert(std::move(connection));
if (!inserted) {
return;
}
auto& insertedConnection = *insertPos;
logger_->log_debug("Add connection {} into process group {}", insertedConnection->getName(), name_);
// only allow connections between processors of the same process group or in/output ports of child process groups
// check input and output ports connection restrictions inside and outside a process group
Processor* source = findPortById(insertedConnection->getSourceUUID());
if (source && dynamic_cast<Port*>(source)->getPortType() == PortType::OUTPUT) {
logger_->log_error("Output port [id = '{}'] cannot be a source inside the process group in the connection [name = '{}', id = '{}']",
insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
source = nullptr;
} else if (!source) {
source = findChildPortById(insertedConnection->getSourceUUID());
if (source && dynamic_cast<Port*>(source)->getPortType() == PortType::INPUT) {
logger_->log_error("Input port [id = '{}'] cannot be a source outside the process group in the connection [name = '{}', id = '{}']",
insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
source = nullptr;
} else if (!source) {
source = findProcessorById(insertedConnection->getSourceUUID(), Traverse::ExcludeChildren);
if (!source) {
logger_->log_error("Cannot find the source processor with id '{}' for the connection [name = '{}', id = '{}']",
insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
}
}
}
if (source) {
source->addConnection(insertedConnection.get());
}
Processor* destination = findPortById(insertedConnection->getDestinationUUID());
if (destination && dynamic_cast<Port*>(destination)->getPortType() == PortType::INPUT) {
logger_->log_error("Input port [id = '{}'] cannot be a destination inside the process group in the connection [name = '{}', id = '{}']",
insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
destination = nullptr;
} else if (!destination) {
destination = findChildPortById(insertedConnection->getDestinationUUID());
if (destination && dynamic_cast<Port*>(destination)->getPortType() == PortType::OUTPUT) {
logger_->log_error("Output port [id = '{}'] cannot be a destination outside the process group in the connection [name = '{}', id = '{}']",
insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
destination = nullptr;
} else if (!destination) {
destination = findProcessorById(insertedConnection->getDestinationUUID(), Traverse::ExcludeChildren);
if (!destination) {
logger_->log_error("Cannot find the destination processor with id '{}' for the connection [name = '{}', id = '{}']",
insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
}
}
}
if (destination && destination != source) {
destination->addConnection(insertedConnection.get());
}
}
void ProcessGroup::drainConnections() {
for (auto&& connection : connections_) {
connection->drain(false);
}
for (auto& childGroup : child_process_groups_) {
childGroup->drainConnections();
}
}
std::size_t ProcessGroup::getTotalFlowFileCount() const {
std::size_t sum = 0;
for (const auto& conn : connections_) {
sum += gsl::narrow<std::size_t>(conn->getQueueSize());
}
for (const auto& childGroup : child_process_groups_) {
sum += childGroup->getTotalFlowFileCount();
}
return sum;
}
void ProcessGroup::verify() const {
for (const auto& processor : processors_) {
processor->validateAnnotations();
}
}
void ProcessGroup::setParameterContext(ParameterContext* parameter_context) {
parameter_context_ = parameter_context;
}
ParameterContext* ProcessGroup::getParameterContext() const {
return parameter_context_;
}
std::string ProcessGroup::buildGroupPath() const {
std::lock_guard<std::recursive_mutex> lock(mutex_);
std::string path = name_;
auto parent = parent_process_group_;
while (parent != nullptr) {
path.insert(0, parent->getName() + " / ");
parent = parent->getParent();
}
return path;
}
} // namespace org::apache::nifi::minifi::core