blob: 205f9f2e68903c86372fd26c4b2d6ba8ec54283d [file] [log] [blame]
/**
* @file ProcessGroup.cpp
* ProcessGroup class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "core/ProcessGroup.h"
#include <sys/time.h>
#include <time.h>
#include <vector>
#include <memory>
#include <string>
#include <queue>
#include <map>
#include <set>
#include <chrono>
#include <thread>
#include "core/Processor.h"
#include "core/logging/LoggerConfiguration.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator();
ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, int version, ProcessGroup *parent)
: logger_(logging::LoggerFactory<ProcessGroup>::getLogger()),
name_(name),
type_(type),
config_version_(version),
parent_process_group_(parent) {
if (!uuid)
// Generate the global UUID for the flow record
id_generator_->generate(uuid_);
else
uuid_copy(uuid_, uuid);
yield_period_msec_ = 0;
transmitting_ = false;
transport_protocol_ = "RAW";
logger_->log_debug("ProcessGroup %s created", name_);
}
ProcessGroup::~ProcessGroup() {
for (auto &&connection : connections_) {
connection->drain();
}
for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) {
ProcessGroup *processGroup(*it);
delete processGroup;
}
}
bool ProcessGroup::isRootProcessGroup() {
std::lock_guard<std::recursive_mutex> lock(mutex_);
return (type_ == ROOT_PROCESS_GROUP);
}
void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (processors_.find(processor) == processors_.end()) {
// We do not have the same processor in this process group yet
processors_.insert(processor);
logger_->log_debug("Add processor %s into process group %s", processor->getName(), name_);
}
}
void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (processors_.find(processor) != processors_.end()) {
// We do have the same processor in this process group yet
processors_.erase(processor);
logger_->log_debug("Remove processor %s from process group %s", processor->getName(), name_);
}
}
void ProcessGroup::addProcessGroup(ProcessGroup *child) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (child_process_groups_.find(child) == child_process_groups_.end()) {
// We do not have the same child process group in this process group yet
child_process_groups_.insert(child);
logger_->log_debug("Add child process group %s into process group %s", child->getName(), name_);
}
}
void ProcessGroup::removeProcessGroup(ProcessGroup *child) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (child_process_groups_.find(child) != child_process_groups_.end()) {
// We do have the same child process group in this process group yet
child_process_groups_.erase(child);
logger_->log_debug("Remove child process group %s from process group %s", child->getName(), name_);
}
}
void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
try {
// Start all the processor node, input and output ports
for (auto processor : processors_) {
logger_->log_debug("Starting %s", processor->getName());
if (!processor->isRunning() && processor->getScheduledState() != DISABLED) {
if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
timeScheduler->schedule(processor);
else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
eventScheduler->schedule(processor);
}
}
// Start processing the group
for (auto processGroup : child_process_groups_) {
processGroup->startProcessing(timeScheduler, eventScheduler);
}
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during process group start processing");
throw;
}
}
void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
try {
// Stop all the processor node, input and output ports
for (std::set<std::shared_ptr<Processor> >::iterator it = processors_.begin(); it != processors_.end(); ++it) {
std::shared_ptr<Processor> processor(*it);
if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
timeScheduler->unschedule(processor);
else if (processor->getSchedulingStrategy() == EVENT_DRIVEN)
eventScheduler->unschedule(processor);
}
for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) {
ProcessGroup *processGroup(*it);
processGroup->stopProcessing(timeScheduler, eventScheduler);
}
} catch (std::exception &exception) {
logger_->log_debug("Caught Exception %s", exception.what());
throw;
} catch (...) {
logger_->log_debug("Caught Exception during process group stop processing");
throw;
}
}
std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
logger_->log_debug("find processor %s", processor->getName());
uuid_t processorUUID;
if (processor->getUUID(processorUUID)) {
char uuid_str[37]; // ex. "1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0"
uuid_unparse_lower(processorUUID, uuid_str);
std::string processorUUIDstr = uuid_str;
uuid_unparse_lower(uuid, uuid_str);
std::string uuidStr = uuid_str;
if (processorUUIDstr == uuidStr) {
return processor;
}
}
}
for (auto processGroup : child_process_groups_) {
logger_->log_debug("find processor child %s", processGroup->getName());
std::shared_ptr<Processor> processor = processGroup->findProcessor(uuid);
if (processor)
return processor;
}
return ret;
}
void ProcessGroup::addControllerService(const std::string &nodeId, std::shared_ptr<core::controller::ControllerServiceNode> &node) {
controller_service_map_.put(nodeId, node);
}
/**
* Find controllerservice node will search child groups until the nodeId is found.
* @param node node identifier
* @return controller service node, if it exists.
*/
std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findControllerService(const std::string &nodeId) {
return controller_service_map_.getControllerServiceNode(nodeId);
}
void ProcessGroup::getAllProcessors(std::vector<std::shared_ptr<Processor>> &processor_vec) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
logger_->log_debug("Current processor is %s", processor->getName());
processor_vec.push_back(processor);
}
for (auto processGroup : child_process_groups_) {
processGroup->getAllProcessors(processor_vec);
}
}
std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &processorName) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
std::shared_ptr<Processor> ret = NULL;
for (auto processor : processors_) {
logger_->log_debug("Current processor is %s", processor->getName());
if (processor->getName() == processorName)
return processor;
}
for (auto processGroup : child_process_groups_) {
std::shared_ptr<Processor> processor = processGroup->findProcessor(processorName);
if (processor)
return processor;
}
return ret;
}
void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
for (auto processor : processors_) {
if (processor->getName() == processorName) {
processor->setProperty(propertyName, propertyValue);
}
}
for (auto processGroup : child_process_groups_) {
processGroup->updatePropertyValue(processorName, propertyName, propertyValue);
}
return;
}
void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap) {
for (auto connection : connections_) {
connectionMap[connection->getUUIDStr()] = connection;
connectionMap[connection->getName()] = connection;
}
for (auto processGroup : child_process_groups_) {
processGroup->getConnections(connectionMap);
}
}
void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap) {
for (auto connection : connections_) {
connectionMap[connection->getUUIDStr()] = connection;
connectionMap[connection->getName()] = connection;
}
for (auto processGroup : child_process_groups_) {
processGroup->getConnections(connectionMap);
}
}
void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (connections_.find(connection) == connections_.end()) {
// We do not have the same connection in this process group yet
connections_.insert(connection);
logger_->log_debug("Add connection %s into process group %s", connection->getName(), name_);
uuid_t sourceUUID;
std::shared_ptr<Processor> source = NULL;
connection->getSourceUUID(sourceUUID);
source = this->findProcessor(sourceUUID);
if (source)
source->addConnection(connection);
std::shared_ptr<Processor> destination = NULL;
uuid_t destinationUUID;
connection->getDestinationUUID(destinationUUID);
destination = this->findProcessor(destinationUUID);
if (destination && destination != source)
destination->addConnection(connection);
}
}
void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (connections_.find(connection) != connections_.end()) {
// We do not have the same connection in this process group yet
connections_.erase(connection);
logger_->log_debug("Remove connection %s into process group %s", connection->getName(), name_);
uuid_t sourceUUID;
std::shared_ptr<Processor> source = NULL;
connection->getSourceUUID(sourceUUID);
source = this->findProcessor(sourceUUID);
if (source)
source->removeConnection(connection);
std::shared_ptr<Processor> destination = NULL;
uuid_t destinationUUID;
connection->getDestinationUUID(destinationUUID);
destination = this->findProcessor(destinationUUID);
if (destination && destination != source)
destination->removeConnection(connection);
}
}
} /* namespace core */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */