| /** |
| * @file ProcessGroup.cpp |
| * ProcessGroup class implementation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "core/ProcessGroup.h" |
| #include <sys/time.h> |
| #include <time.h> |
| #include <vector> |
| #include <memory> |
| #include <string> |
| #include <queue> |
| #include <map> |
| #include <set> |
| #include <chrono> |
| #include <thread> |
| #include "core/Processor.h" |
| #include "core/logging/LoggerConfiguration.h" |
| |
| namespace org { |
| namespace apache { |
| namespace nifi { |
| namespace minifi { |
| namespace core { |
| |
| std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator(); |
| |
| ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, int version, ProcessGroup *parent) |
| : logger_(logging::LoggerFactory<ProcessGroup>::getLogger()), |
| name_(name), |
| type_(type), |
| config_version_(version), |
| parent_process_group_(parent) { |
| if (!uuid) |
| // Generate the global UUID for the flow record |
| id_generator_->generate(uuid_); |
| else |
| uuid_copy(uuid_, uuid); |
| |
| yield_period_msec_ = 0; |
| transmitting_ = false; |
| transport_protocol_ = "RAW"; |
| |
| logger_->log_debug("ProcessGroup %s created", name_); |
| } |
| |
| ProcessGroup::~ProcessGroup() { |
| for (auto &&connection : connections_) { |
| connection->drain(); |
| } |
| |
| for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) { |
| ProcessGroup *processGroup(*it); |
| delete processGroup; |
| } |
| } |
| |
| bool ProcessGroup::isRootProcessGroup() { |
| std::lock_guard<std::recursive_mutex> lock(mutex_); |
| return (type_ == ROOT_PROCESS_GROUP); |
| } |
| |
| void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) { |
| std::lock_guard<std::recursive_mutex> lock(mutex_); |
| |
| if (processors_.find(processor) == processors_.end()) { |
| // We do not have the same processor in this process group yet |
| processors_.insert(processor); |
| logger_->log_debug("Add processor %s into process group %s", processor->getName(), name_); |
| } |
| } |
| |
| void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) { |
| std::lock_guard<std::recursive_mutex> lock(mutex_); |
| |
| if (processors_.find(processor) != processors_.end()) { |
| // We do have the same processor in this process group yet |
| processors_.erase(processor); |
| logger_->log_debug("Remove processor %s from process group %s", processor->getName(), name_); |
| } |
| } |
| |
| void ProcessGroup::addProcessGroup(ProcessGroup *child) { |
| std::lock_guard<std::recursive_mutex> lock(mutex_); |
| |
| if (child_process_groups_.find(child) == child_process_groups_.end()) { |
| // We do not have the same child process group in this process group yet |
| child_process_groups_.insert(child); |
| logger_->log_debug("Add child process group %s into process group %s", child->getName(), name_); |
| } |
| } |
| |
| void ProcessGroup::removeProcessGroup(ProcessGroup *child) { |
| std::lock_guard<std::recursive_mutex> lock(mutex_); |
| |
| if (child_process_groups_.find(child) != child_process_groups_.end()) { |
| // We do have the same child process group in this process group yet |
| child_process_groups_.erase(child); |
| logger_->log_debug("Remove child process group %s from process group %s", child->getName(), name_); |
| } |
| } |
| |
| void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) { |
| std::lock_guard<std::recursive_mutex> lock(mutex_); |
| |
| try { |
| // Start all the processor node, input and output ports |
| for (auto processor : processors_) { |
| logger_->log_debug("Starting %s", processor->getName()); |
| |
| if (!processor->isRunning() && processor->getScheduledState() != DISABLED) { |
| if (processor->getSchedulingStrategy() == TIMER_DRIVEN) |
| timeScheduler->schedule(processor); |
| else if (processor->getSchedulingStrategy() == EVENT_DRIVEN) |
| eventScheduler->schedule(processor); |
| } |
| } |
| // Start processing the group |
| for (auto processGroup : child_process_groups_) { |
| processGroup->startProcessing(timeScheduler, eventScheduler); |
| } |
| } catch (std::exception &exception) { |
| logger_->log_debug("Caught Exception %s", exception.what()); |
| throw; |
| } catch (...) { |
| logger_->log_debug("Caught Exception during process group start processing"); |
| throw; |
| } |
| } |
| |
| void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) { |
| std::lock_guard<std::recursive_mutex> lock(mutex_); |
| |
| try { |
| // Stop all the processor node, input and output ports |
| for (std::set<std::shared_ptr<Processor> >::iterator it = processors_.begin(); it != processors_.end(); ++it) { |
| std::shared_ptr<Processor> processor(*it); |
| if (processor->getSchedulingStrategy() == TIMER_DRIVEN) |
| timeScheduler->unschedule(processor); |
| else if (processor->getSchedulingStrategy() == EVENT_DRIVEN) |
| eventScheduler->unschedule(processor); |
| } |
| |
| for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) { |
| ProcessGroup *processGroup(*it); |
| processGroup->stopProcessing(timeScheduler, eventScheduler); |
| } |
| } catch (std::exception &exception) { |
| logger_->log_debug("Caught Exception %s", exception.what()); |
| throw; |
| } catch (...) { |
| logger_->log_debug("Caught Exception during process group stop processing"); |
| throw; |
| } |
| } |
| |
| std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) { |
| std::lock_guard<std::recursive_mutex> lock(mutex_); |
| std::shared_ptr<Processor> ret = NULL; |
| for (auto processor : processors_) { |
| logger_->log_debug("find processor %s", processor->getName()); |
| uuid_t processorUUID; |
| |
| if (processor->getUUID(processorUUID)) { |
| char uuid_str[37]; // ex. "1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0" |
| uuid_unparse_lower(processorUUID, uuid_str); |
| std::string processorUUIDstr = uuid_str; |
| uuid_unparse_lower(uuid, uuid_str); |
| std::string uuidStr = uuid_str; |
| if (processorUUIDstr == uuidStr) { |
| return processor; |
| } |
| } |
| } |
| for (auto processGroup : child_process_groups_) { |
| logger_->log_debug("find processor child %s", processGroup->getName()); |
| std::shared_ptr<Processor> processor = processGroup->findProcessor(uuid); |
| if (processor) |
| return processor; |
| } |
| return ret; |
| } |
| |
| void ProcessGroup::addControllerService(const std::string &nodeId, std::shared_ptr<core::controller::ControllerServiceNode> &node) { |
| controller_service_map_.put(nodeId, node); |
| } |
| |
| /** |
| * Find controllerservice node will search child groups until the nodeId is found. |
| * @param node node identifier |
| * @return controller service node, if it exists. |
| */ |
| std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findControllerService(const std::string &nodeId) { |
| return controller_service_map_.getControllerServiceNode(nodeId); |
| } |
| |
| void ProcessGroup::getAllProcessors(std::vector<std::shared_ptr<Processor>> &processor_vec) { |
| std::lock_guard<std::recursive_mutex> lock(mutex_); |
| std::shared_ptr<Processor> ret = NULL; |
| |
| for (auto processor : processors_) { |
| logger_->log_debug("Current processor is %s", processor->getName()); |
| processor_vec.push_back(processor); |
| } |
| for (auto processGroup : child_process_groups_) { |
| processGroup->getAllProcessors(processor_vec); |
| } |
| } |
| |
| std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &processorName) { |
| std::lock_guard<std::recursive_mutex> lock(mutex_); |
| std::shared_ptr<Processor> ret = NULL; |
| for (auto processor : processors_) { |
| logger_->log_debug("Current processor is %s", processor->getName()); |
| if (processor->getName() == processorName) |
| return processor; |
| } |
| for (auto processGroup : child_process_groups_) { |
| std::shared_ptr<Processor> processor = processGroup->findProcessor(processorName); |
| if (processor) |
| return processor; |
| } |
| return ret; |
| } |
| |
| void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) { |
| std::lock_guard<std::recursive_mutex> lock(mutex_); |
| for (auto processor : processors_) { |
| if (processor->getName() == processorName) { |
| processor->setProperty(propertyName, propertyValue); |
| } |
| } |
| for (auto processGroup : child_process_groups_) { |
| processGroup->updatePropertyValue(processorName, propertyName, propertyValue); |
| } |
| return; |
| } |
| |
| void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap) { |
| for (auto connection : connections_) { |
| connectionMap[connection->getUUIDStr()] = connection; |
| connectionMap[connection->getName()] = connection; |
| } |
| for (auto processGroup : child_process_groups_) { |
| processGroup->getConnections(connectionMap); |
| } |
| } |
| |
| void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap) { |
| for (auto connection : connections_) { |
| connectionMap[connection->getUUIDStr()] = connection; |
| connectionMap[connection->getName()] = connection; |
| } |
| for (auto processGroup : child_process_groups_) { |
| processGroup->getConnections(connectionMap); |
| } |
| } |
| |
| void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) { |
| std::lock_guard<std::recursive_mutex> lock(mutex_); |
| |
| if (connections_.find(connection) == connections_.end()) { |
| // We do not have the same connection in this process group yet |
| connections_.insert(connection); |
| logger_->log_debug("Add connection %s into process group %s", connection->getName(), name_); |
| uuid_t sourceUUID; |
| std::shared_ptr<Processor> source = NULL; |
| connection->getSourceUUID(sourceUUID); |
| source = this->findProcessor(sourceUUID); |
| if (source) |
| source->addConnection(connection); |
| std::shared_ptr<Processor> destination = NULL; |
| uuid_t destinationUUID; |
| connection->getDestinationUUID(destinationUUID); |
| destination = this->findProcessor(destinationUUID); |
| if (destination && destination != source) |
| destination->addConnection(connection); |
| } |
| } |
| |
| void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) { |
| std::lock_guard<std::recursive_mutex> lock(mutex_); |
| |
| if (connections_.find(connection) != connections_.end()) { |
| // We do not have the same connection in this process group yet |
| connections_.erase(connection); |
| logger_->log_debug("Remove connection %s into process group %s", connection->getName(), name_); |
| uuid_t sourceUUID; |
| std::shared_ptr<Processor> source = NULL; |
| connection->getSourceUUID(sourceUUID); |
| source = this->findProcessor(sourceUUID); |
| if (source) |
| source->removeConnection(connection); |
| std::shared_ptr<Processor> destination = NULL; |
| uuid_t destinationUUID; |
| connection->getDestinationUUID(destinationUUID); |
| destination = this->findProcessor(destinationUUID); |
| if (destination && destination != source) |
| destination->removeConnection(connection); |
| } |
| } |
| |
| } /* namespace core */ |
| } /* namespace minifi */ |
| } /* namespace nifi */ |
| } /* namespace apache */ |
| } /* namespace org */ |