blob: 70ee9d7bcf513926ae289bef682ea37c0fd6f55b [file] [log] [blame]
/**
* @file ProcessGroup.cpp
* ProcessGroup class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sys/time.h>
#include <time.h>
#include <chrono>
#include <thread>
#include "ProcessGroup.h"
#include "Processor.h"
ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, ProcessGroup *parent)
: _name(name),
_type(type),
_parentProcessGroup(parent)
{
if (!uuid)
// Generate the global UUID for the flow record
uuid_generate(_uuid);
else
uuid_copy(_uuid, uuid);
_yieldPeriodMsec = 0;
_transmitting = false;
_logger = Logger::getLogger();
_logger->log_info("ProcessGroup %s created", _name.c_str());
}
ProcessGroup::~ProcessGroup()
{
for (std::set<Connection *>::iterator it = _connections.begin(); it != _connections.end(); ++it)
{
Connection *connection = *it;
connection->drain();
delete connection;
}
for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
{
ProcessGroup *processGroup(*it);
delete processGroup;
}
for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
{
Processor *processor(*it);
delete processor;
}
}
bool ProcessGroup::isRootProcessGroup()
{
std::lock_guard<std::mutex> lock(_mtx);
return (_type == ROOT_PROCESS_GROUP);
}
void ProcessGroup::addProcessor(Processor *processor)
{
std::lock_guard<std::mutex> lock(_mtx);
if (_processors.find(processor) == _processors.end())
{
// We do not have the same processor in this process group yet
_processors.insert(processor);
_logger->log_info("Add processor %s into process group %s",
processor->getName().c_str(), _name.c_str());
}
}
void ProcessGroup::removeProcessor(Processor *processor)
{
std::lock_guard<std::mutex> lock(_mtx);
if (_processors.find(processor) != _processors.end())
{
// We do have the same processor in this process group yet
_processors.erase(processor);
_logger->log_info("Remove processor %s from process group %s",
processor->getName().c_str(), _name.c_str());
}
}
void ProcessGroup::addProcessGroup(ProcessGroup *child)
{
std::lock_guard<std::mutex> lock(_mtx);
if (_childProcessGroups.find(child) == _childProcessGroups.end())
{
// We do not have the same child process group in this process group yet
_childProcessGroups.insert(child);
_logger->log_info("Add child process group %s into process group %s",
child->getName().c_str(), _name.c_str());
}
}
void ProcessGroup::removeProcessGroup(ProcessGroup *child)
{
std::lock_guard<std::mutex> lock(_mtx);
if (_childProcessGroups.find(child) != _childProcessGroups.end())
{
// We do have the same child process group in this process group yet
_childProcessGroups.erase(child);
_logger->log_info("Remove child process group %s from process group %s",
child->getName().c_str(), _name.c_str());
}
}
void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler)
{
std::lock_guard<std::mutex> lock(_mtx);
try
{
// Start all the processor node, input and output ports
for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
{
Processor *processor(*it);
if (!processor->isRunning() && processor->getScheduledState() != DISABLED)
{
if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
timeScheduler->schedule(processor);
}
}
for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
{
ProcessGroup *processGroup(*it);
processGroup->startProcessing(timeScheduler);
}
}
catch (std::exception &exception)
{
_logger->log_debug("Caught Exception %s", exception.what());
throw;
}
catch (...)
{
_logger->log_debug("Caught Exception during process group start processing");
throw;
}
}
void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler)
{
std::lock_guard<std::mutex> lock(_mtx);
try
{
// Stop all the processor node, input and output ports
for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
{
Processor *processor(*it);
if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
timeScheduler->unschedule(processor);
}
for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
{
ProcessGroup *processGroup(*it);
processGroup->stopProcessing(timeScheduler);
}
}
catch (std::exception &exception)
{
_logger->log_debug("Caught Exception %s", exception.what());
throw;
}
catch (...)
{
_logger->log_debug("Caught Exception during process group stop processing");
throw;
}
}
Processor *ProcessGroup::findProcessor(uuid_t uuid)
{
Processor *ret = NULL;
// std::lock_guard<std::mutex> lock(_mtx);
for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
{
Processor *processor(*it);
uuid_t processorUUID;
if (processor->getUUID(processorUUID) && uuid_compare(processorUUID, uuid) == 0)
return processor;
}
for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
{
ProcessGroup *processGroup(*it);
Processor *processor = processGroup->findProcessor(uuid);
if (processor)
return processor;
}
return ret;
}
Processor *ProcessGroup::findProcessor(std::string processorName)
{
Processor *ret = NULL;
for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
{
Processor *processor(*it);
_logger->log_debug("Current processor is %s", processor->getName().c_str());
if (processor->getName() == processorName)
return processor;
}
for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
{
ProcessGroup *processGroup(*it);
Processor *processor = processGroup->findProcessor(processorName);
if (processor)
return processor;
}
return ret;
}
void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue)
{
std::lock_guard<std::mutex> lock(_mtx);
for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it)
{
Processor *processor(*it);
if (processor->getName() == processorName)
{
processor->setProperty(propertyName, propertyValue);
}
}
for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
{
ProcessGroup *processGroup(*it);
processGroup->updatePropertyValue(processorName, propertyName, propertyValue);
}
return;
}
void ProcessGroup::addConnection(Connection *connection)
{
std::lock_guard<std::mutex> lock(_mtx);
if (_connections.find(connection) == _connections.end())
{
// We do not have the same connection in this process group yet
_connections.insert(connection);
_logger->log_info("Add connection %s into process group %s",
connection->getName().c_str(), _name.c_str());
uuid_t sourceUUID;
Processor *source = NULL;
connection->getSourceProcessorUUID(sourceUUID);
source = this->findProcessor(sourceUUID);
if (source)
source->addConnection(connection);
Processor *destination = NULL;
uuid_t destinationUUID;
connection->getDestinationProcessorUUID(destinationUUID);
destination = this->findProcessor(destinationUUID);
if (destination && destination != source)
destination->addConnection(connection);
}
}
void ProcessGroup::removeConnection(Connection *connection)
{
std::lock_guard<std::mutex> lock(_mtx);
if (_connections.find(connection) != _connections.end())
{
// We do not have the same connection in this process group yet
_connections.erase(connection);
_logger->log_info("Remove connection %s into process group %s",
connection->getName().c_str(), _name.c_str());
uuid_t sourceUUID;
Processor *source = NULL;
connection->getSourceProcessorUUID(sourceUUID);
source = this->findProcessor(sourceUUID);
if (source)
source->removeConnection(connection);
Processor *destination = NULL;
uuid_t destinationUUID;
connection->getDestinationProcessorUUID(destinationUUID);
destination = this->findProcessor(destinationUUID);
if (destination && destination != source)
destination->removeConnection(connection);
}
}