blob: ee88d7952f5f90e7a250aaaa0b853a84c9b8ea88 [file] [log] [blame]
/**
* @file ProcessGroup.cpp
* ProcessGroup class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "core/ProcessGroup.h"
#include <time.h>
#include <vector>
#include <memory>
#include <string>
#include <queue>
#include <map>
#include <set>
#include <chrono>
#include <thread>
#include "core/Processor.h"
#include "core/logging/LoggerConfiguration.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator();
ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, utils::Identifier &uuid)
: ProcessGroup(type, name, uuid, 0, 0) {
}
ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, utils::Identifier &uuid, int version)
: ProcessGroup(type, name, uuid, version, 0) {
}
ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, utils::Identifier &uuid, int version, ProcessGroup *parent)
: logger_(logging::LoggerFactory<ProcessGroup>::getLogger()),
name_(name),
type_(type),
config_version_(version),
parent_process_group_(parent) {
if (uuid == nullptr) {
id_generator_->generate(uuid_);
} else {
uuid_ = uuid;
}
yield_period_msec_ = 0;
if (parent_process_group_ != 0) {
onschedule_retry_msec_ = parent_process_group_->getOnScheduleRetryPeriod();
} else {
onschedule_retry_msec_ = ONSCHEDULE_RETRY_INTERVAL;
}
transmitting_ = false;
transport_protocol_ = "RAW";
logger_->log_debug("ProcessGroup %s created", name_);
}
ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name)
: logger_(logging::LoggerFactory<ProcessGroup>::getLogger()),
name_(name),
type_(type),
config_version_(0),
parent_process_group_(0) {
id_generator_->generate(uuid_);
yield_period_msec_ = 0;
onschedule_retry_msec_ = ONSCHEDULE_RETRY_INTERVAL;
transmitting_ = false;
transport_protocol_ = "RAW";
logger_->log_debug("ProcessGroup %s created", name_);
}
ProcessGroup::~ProcessGroup() {
if (onScheduleTimer_) {
onScheduleTimer_->stop();
}
for (auto&& connection : connections_) {
connection->drain(false);
}
for (ProcessGroup* childGroup : child_process_groups_) {
delete childGroup;
}
}
bool ProcessGroup::isRootProcessGroup() {
std::lock_guard<std::recursive_mutex> lock(mutex_);
return (type_ == ROOT_PROCESS_GROUP);
}
void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (processors_.find(processor) == processors_.end()) {
// We do not have the same processor in this process group yet
processors_.insert(processor);
logger_->log_debug("Add processor %s into process group %s", processor->getName(), name_);
}
}
void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (processors_.find(processor) != processors_.end()) {
// We do have the same processor in this process group yet
processors_.erase(processor);
logger_->log_debug("Remove processor %s from process group %s", processor->getName(), name_);
}
}
void ProcessGroup::addProcessGroup(ProcessGroup *child) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (child_process_groups_.find(child) == child_process_groups_.end()) {
// We do not have the same child process group in this process group yet
child_process_groups_.insert(child);
logger_->log_debug("Add child process group %s into process group %s", child->getName(), name_);
}
}
void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (child_process_groups_.find(child) != child_process_groups_.end()) {
// We do have the same child process group in this process group yet
child_process_groups_.erase(child);
logger_->log_debug("Remove child process group %s from process group %s", child->getName(), name_);
}
}
void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler,
const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::set<std::shared_ptr<Processor> > failed_processors;
for (const auto &processor : failed_processors_) {
try {
logger_->log_debug("Starting %s", processor->getName());
switch (processor->getSchedulingStrategy()) {
case TIMER_DRIVEN:
timeScheduler->schedule(processor);
break;
case EVENT_DRIVEN:
eventScheduler->schedule(processor);
break;
case CRON_DRIVEN:
cronScheduler->schedule(processor);
break;
}
}
catch (const std::exception &e) {
logger_->log_error("Failed to start processor %s (%s): %s", processor->getUUIDStr(), processor->getName(), e.what());
failed_processors.insert(processor);
}
catch (...) {
logger_->log_error("Failed to start processor %s (%s)", processor->getUUIDStr(), processor->getName());
failed_processors.insert(processor);
}
}
failed_processors_ = std::move(failed_processors);
for (auto& processor : failed_processors_) {
try {
processor->onUnSchedule();
} catch (...) {
logger_->log_error("Exception occured during unscheduling processor: %s (%s)", processor->getUUIDStr(), processor->getName());
}
}
if (!onScheduleTimer_ && !failed_processors_.empty() && onschedule_retry_msec_ > 0) {
logger_->log_info("Retrying failed processors in %lld msec", onschedule_retry_msec_.load());
auto func = [this, eventScheduler, cronScheduler, timeScheduler]() {
this->startProcessingProcessors(timeScheduler, eventScheduler, cronScheduler);
};
onScheduleTimer_.reset(new utils::CallBackTimer(std::chrono::milliseconds(onschedule_retry_msec_), func));
onScheduleTimer_->start();
} else if (failed_processors_.empty() && onScheduleTimer_) {
onScheduleTimer_->stop();
}
}
void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
try {
failed_processors_ = processors_; // All processors are marked as failed.
// Start all the processor node, input and output ports
startProcessingProcessors(timeScheduler, eventScheduler, cronScheduler);
// Start processing the group
for (auto processGroup : child_process_groups_) {
processGroup->startProcessing(timeScheduler, eventScheduler, cronScheduler);
}
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during process group start processing");
throw;
}
}
void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (onScheduleTimer_) {
onScheduleTimer_->stop();
}
onScheduleTimer_.reset();
try {
// Stop all the processor node, input and output ports
for (const auto &processor : processors_) {
logger_->log_debug("Stopping %s", processor->getName());
switch (processor->getSchedulingStrategy()) {
case TIMER_DRIVEN:
timeScheduler->unschedule(processor);
break;
case EVENT_DRIVEN:
eventScheduler->unschedule(processor);
break;
case CRON_DRIVEN:
cronScheduler->unschedule(processor);
break;
}
}
for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) {
ProcessGroup *processGroup(*it);
processGroup->stopProcessing(timeScheduler, eventScheduler, cronScheduler);
}
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during process group stop processing");
throw;
}
}
std::shared_ptr<Processor> ProcessGroup::findProcessor(utils::Identifier &uuid) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
logger_->log_debug("find processor %s", processor->getName());
utils::Identifier processorUUID;
if (processor->getUUID(processorUUID)) {
if (uuid == processorUUID) {
return processor;
}
}
}
for (auto processGroup : child_process_groups_) {
logger_->log_debug("find processor child %s", processGroup->getName());
std::shared_ptr<Processor> processor = processGroup->findProcessor(uuid);
if (processor)
return processor;
}
return ret;
}
void ProcessGroup::addControllerService(const std::string &nodeId, std::shared_ptr<core::controller::ControllerServiceNode> &node) {
controller_service_map_.put(nodeId, node);
}
/**
* Find controllerservice node will search child groups until the nodeId is found.
* @param node node identifier
* @return controller service node, if it exists.
*/
std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findControllerService(const std::string &nodeId) {
return controller_service_map_.getControllerServiceNode(nodeId);
}
void ProcessGroup::getAllProcessors(std::vector<std::shared_ptr<Processor>> &processor_vec) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
logger_->log_debug("Current processor is %s", processor->getName());
processor_vec.push_back(processor);
}
for (auto processGroup : child_process_groups_) {
processGroup->getAllProcessors(processor_vec);
}
}
std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &processorName) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
logger_->log_debug("Current processor is %s", processor->getName());
if (processor->getName() == processorName)
return processor;
}
for (auto processGroup : child_process_groups_) {
std::shared_ptr<Processor> processor = processGroup->findProcessor(processorName);
if (processor)
return processor;
}
return ret;
}
void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
for (auto processor : processors_) {
if (processor->getName() == processorName) {
processor->setProperty(propertyName, propertyValue);
}
}
for (auto processGroup : child_process_groups_) {
processGroup->updatePropertyValue(processorName, propertyName, propertyValue);
}
return;
}
void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap) {
for (auto connection : connections_) {
connectionMap[connection->getUUIDStr()] = connection;
connectionMap[connection->getName()] = connection;
}
for (auto processGroup : child_process_groups_) {
processGroup->getConnections(connectionMap);
}
}
void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap) {
for (auto connection : connections_) {
connectionMap[connection->getUUIDStr()] = connection;
connectionMap[connection->getName()] = connection;
}
for (auto processGroup : child_process_groups_) {
processGroup->getConnections(connectionMap);
}
}
void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (connections_.find(connection) == connections_.end()) {
// We do not have the same connection in this process group yet
connections_.insert(connection);
logger_->log_debug("Add connection %s into process group %s", connection->getName(), name_);
utils::Identifier sourceUUID;
std::shared_ptr<Processor> source = NULL;
connection->getSourceUUID(sourceUUID);
source = this->findProcessor(sourceUUID);
if (source)
source->addConnection(connection);
std::shared_ptr<Processor> destination = NULL;
utils::Identifier destinationUUID;
connection->getDestinationUUID(destinationUUID);
destination = this->findProcessor(destinationUUID);
if (destination && destination != source)
destination->addConnection(connection);
}
}
void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (connections_.find(connection) != connections_.end()) {
// We do not have the same connection in this process group yet
connections_.erase(connection);
logger_->log_debug("Remove connection %s into process group %s", connection->getName(), name_);
utils::Identifier sourceUUID;
std::shared_ptr<Processor> source = NULL;
connection->getSourceUUID(sourceUUID);
source = this->findProcessor(sourceUUID);
if (source)
source->removeConnection(connection);
std::shared_ptr<Processor> destination = NULL;
utils::Identifier destinationUUID;
connection->getDestinationUUID(destinationUUID);
destination = this->findProcessor(destinationUUID);
if (destination && destination != source)
destination->removeConnection(connection);
}
}
void ProcessGroup::drainConnections() {
for (auto&& connection : connections_) {
connection->drain(false);
}
for (ProcessGroup* childGroup : child_process_groups_) {
childGroup->drainConnections();
}
}
} /* namespace core */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */