blob: b176a12793482f77492170ea1a1531ebcabb0dbb [file] [log] [blame]
/**
* @file FlowController.cpp
* FlowController class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sys/time.h>
#include <time.h>
#include <chrono>
#include <thread>
#include <libxml/parser.h>
#include <libxml/tree.h>
#include "FlowController.h"
#include "ProcessContext.h"
FlowController::FlowController(std::string name)
: _name(name)
{
uuid_generate(_uuid);
// Setup the default values
_configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME;
_maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
_maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
_running = false;
_initialized = false;
_root = NULL;
_logger = Logger::getLogger();
_protocol = new FlowControlProtocol(this);
// NiFi config properties
_configure = Configure::getConfigure();
std::string rawConfigFileString;
_configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString);
if (!rawConfigFileString.empty())
{
_configurationFileName = rawConfigFileString;
}
char *path = NULL;
char full_path[PATH_MAX];
std::string adjustedFilename;
if (!_configurationFileName.empty())
{
// perform a naive determination if this is a relative path
if (_configurationFileName.c_str()[0] != '/')
{
adjustedFilename = adjustedFilename + _configure->getHome() + "/" + _configurationFileName;
}
else
{
adjustedFilename = _configurationFileName;
}
}
path = realpath(adjustedFilename.c_str(), full_path);
if (!path)
{
_logger->log_error("Could not locate path from provided configuration file name.");
}
char *flowPath = NULL;
char flow_full_path[PATH_MAX];
std::string pathString(path);
_configurationFileName = pathString;
_logger->log_info("FlowController NiFi Configuration file %s", pathString.c_str());
// Create repos for flow record and provenance
_logger->log_info("FlowController %s created", _name.c_str());
}
FlowController::~FlowController()
{
stop(true);
unload();
delete _protocol;
}
bool FlowController::isRunning()
{
return (_running);
}
bool FlowController::isInitialized()
{
return (_initialized);
}
void FlowController::stop(bool force)
{
if (_running)
{
_logger->log_info("Stop Flow Controller");
this->_timerScheduler.stop();
// Wait for sometime for thread stop
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
if (this->_root)
this->_root->stopProcessing(&this->_timerScheduler);
_running = false;
}
}
void FlowController::unload()
{
if (_running)
{
stop(true);
}
if (_initialized)
{
_logger->log_info("Unload Flow Controller");
if (_root)
delete _root;
_root = NULL;
_initialized = false;
_name = "";
}
return;
}
void FlowController::reload(std::string xmlFile)
{
_logger->log_info("Starting to reload Flow Controller with xml %s", xmlFile.c_str());
stop(true);
unload();
std::string oldxmlFile = this->_configurationFileName;
this->_configurationFileName = xmlFile;
load(ConfigFormat::XML);
start();
if (!this->_root)
{
this->_configurationFileName = oldxmlFile;
_logger->log_info("Rollback Flow Controller to xml %s", oldxmlFile.c_str());
stop(true);
unload();
load(ConfigFormat::XML);
start();
}
}
Processor *FlowController::createProcessor(std::string name, uuid_t uuid)
{
Processor *processor = NULL;
if (name == GenerateFlowFile::ProcessorName)
{
processor = new GenerateFlowFile(name, uuid);
}
else if (name == LogAttribute::ProcessorName)
{
processor = new LogAttribute(name, uuid);
}
else if (name == RealTimeDataCollector::ProcessorName)
{
processor = new RealTimeDataCollector(name, uuid);
}
else if (name == GetFile::ProcessorName)
{
processor = new GetFile(name, uuid);
}
else if (name == TailFile::ProcessorName)
{
processor = new TailFile(name, uuid);
}
else if (name == ListenSyslog::ProcessorName)
{
processor = new ListenSyslog(name, uuid);
}
else
{
_logger->log_error("No Processor defined for %s", name.c_str());
return NULL;
}
//! initialize the processor
processor->initialize();
return processor;
}
ProcessGroup *FlowController::createRootProcessGroup(std::string name, uuid_t uuid)
{
return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid);
}
ProcessGroup *FlowController::createRemoteProcessGroup(std::string name, uuid_t uuid)
{
return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid);
}
Connection *FlowController::createConnection(std::string name, uuid_t uuid)
{
return new Connection(name, uuid);
}
void FlowController::parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *parent)
{
uuid_t uuid;
xmlNode *currentNode;
Connection *connection = NULL;
if (!parent)
{
_logger->log_error("parseProcessNode: no parent group existed");
return;
}
// generate the random UIID
uuid_generate(uuid);
for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next)
{
if (currentNode->type == XML_ELEMENT_NODE)
{
if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0)
{
char *id = (char *) xmlNodeGetContent(currentNode);
if (id) {
_logger->log_debug("parseConnection: id => [%s]", id);
uuid_parse(id, uuid);
xmlFree(id);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
char *name = (char *) xmlNodeGetContent(currentNode);
if (name) {
_logger->log_debug("parseConnection: name => [%s]", name);
connection = this->createConnection(name, uuid);
if (connection == NULL) {
xmlFree(name);
return;
}
xmlFree(name);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "sourceId") == 0) {
char *id = (char *) xmlNodeGetContent(currentNode);
if (id) {
_logger->log_debug("parseConnection: sourceId => [%s]", id);
uuid_parse(id, uuid);
xmlFree(id);
if (connection)
connection->setSourceProcessorUUID(uuid);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "destinationId") == 0) {
char *id = (char *) xmlNodeGetContent(currentNode);
if (id) {
_logger->log_debug("parseConnection: destinationId => [%s]", id);
uuid_parse(id, uuid);
xmlFree(id);
if (connection)
connection->setDestinationProcessorUUID(uuid);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueSize") == 0) {
char *temp = (char *) xmlNodeGetContent(currentNode);
int64_t maxWorkQueueSize = 0;
if (temp) {
if (Property::StringToInt(temp, maxWorkQueueSize)) {
_logger->log_debug("parseConnection: maxWorkQueueSize => [%d]", maxWorkQueueSize);
if (connection)
connection->setMaxQueueSize(maxWorkQueueSize);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueDataSize") == 0) {
char *temp = (char *) xmlNodeGetContent(currentNode);
int64_t maxWorkQueueDataSize = 0;
if (temp) {
if (Property::StringToInt(temp, maxWorkQueueDataSize)) {
_logger->log_debug("parseConnection: maxWorkQueueDataSize => [%d]", maxWorkQueueDataSize);
if (connection)
connection->setMaxQueueDataSize(maxWorkQueueDataSize);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "relationship") == 0) {
char *temp = (char *) xmlNodeGetContent(currentNode);
if (temp) {
std::string relationshipName = temp;
if (!relationshipName.empty()) {
Relationship relationship(relationshipName, "");
_logger->log_debug("parseConnection: relationship => [%s]", relationshipName.c_str());
if (connection)
connection->setRelationship(relationship);
} else {
Relationship empty;
_logger->log_debug("parseConnection: relationship => [%s]", empty.getName().c_str());
if (connection)
connection->setRelationship(empty);
}
xmlFree(temp);
}
}
} // if (currentNode->type == XML_ELEMENT_NODE)
} // for node
if (connection)
parent->addConnection(connection);
return;
}
void FlowController::parseRootProcessGroup(xmlDoc *doc, xmlNode *node) {
uuid_t uuid;
xmlNode *currentNode;
ProcessGroup *group = NULL;
// generate the random UIID
uuid_generate(uuid);
for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
if (currentNode->type == XML_ELEMENT_NODE) {
if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
char *id = (char *) xmlNodeGetContent(currentNode);
if (id) {
_logger->log_debug("parseRootProcessGroup: id => [%s]", id);
uuid_parse(id, uuid);
xmlFree(id);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
char *name = (char *) xmlNodeGetContent(currentNode);
if (name) {
_logger->log_debug("parseRootProcessGroup: name => [%s]", name);
group = this->createRootProcessGroup(name, uuid);
if (group == NULL) {
xmlFree(name);
return;
}
// Set the root process group
this->_root = group;
this->_name = name;
xmlFree(name);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "processor") == 0) {
this->parseProcessorNode(doc, currentNode, group);
} else if (xmlStrcmp(currentNode->name, BAD_CAST "connection") == 0) {
this->parseConnection(doc, currentNode, group);
} else if (xmlStrcmp(currentNode->name, BAD_CAST "remoteProcessGroup") == 0) {
this->parseRemoteProcessGroup(doc, currentNode, group);
}
} // if (currentNode->type == XML_ELEMENT_NODE)
} // for node
}
void FlowController::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
uuid_t uuid;
ProcessGroup *group = NULL;
// generate the random UIID
uuid_generate(uuid);
std::string flowName = rootFlowNode["name"].as<std::string>();
char uuidStr[37];
uuid_unparse(_uuid, uuidStr);
_logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr);
_logger->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str());
group = this->createRootProcessGroup(flowName, uuid);
this->_root = group;
this->_name = flowName;
}
void FlowController::parseProcessorNodeYaml(YAML::Node processorsNode, ProcessGroup *parentGroup) {
int64_t schedulingPeriod = -1;
int64_t penalizationPeriod = -1;
int64_t yieldPeriod = -1;
int64_t runDurationNanos = -1;
uuid_t uuid;
Processor *processor = NULL;
if (!parentGroup) {
_logger->log_error("parseProcessNodeYaml: no parent group exists");
return;
}
if (processorsNode) {
// Evaluate sequence of processors
int numProcessors = processorsNode.size();
if (numProcessors < 1) {
throw new std::invalid_argument("There must be at least one processor configured.");
}
std::vector<ProcessorConfig> processorConfigs;
if (processorsNode.IsSequence()) {
for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) {
ProcessorConfig procCfg;
YAML::Node procNode = iter->as<YAML::Node>();
procCfg.name = procNode["name"].as<std::string>();
_logger->log_debug("parseProcessorNode: name => [%s]", procCfg.name.c_str());
procCfg.javaClass = procNode["class"].as<std::string>();
_logger->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass.c_str());
char uuidStr[37];
uuid_unparse(_uuid, uuidStr);
// generate the random UUID
uuid_generate(uuid);
// Determine the processor name only from the Java class
int lastOfIdx = procCfg.javaClass.find_last_of(".");
if (lastOfIdx != std::string::npos) {
lastOfIdx++; // if a value is found, increment to move beyond the .
int nameLength = procCfg.javaClass.length() - lastOfIdx;
std::string processorName = procCfg.javaClass.substr(lastOfIdx, nameLength);
processor = this->createProcessor(processorName, uuid);
}
if (!processor) {
_logger->log_error("Could not create a processor %s with name %s", procCfg.name.c_str(), uuidStr);
throw std::invalid_argument("Could not create processor " + procCfg.name);
}
processor->setName(procCfg.name);
procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
_logger->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks.c_str());
procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>();
_logger->log_debug("parseProcessorNode: scheduling strategy => [%s]",
procCfg.schedulingStrategy.c_str());
procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>();
_logger->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod.c_str());
procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
_logger->log_debug("parseProcessorNode: penalization period => [%s]",
procCfg.penalizationPeriod.c_str());
procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
_logger->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod.c_str());
procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>();
_logger->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos.c_str());
// handle auto-terminated relationships
YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
std::vector<std::string> rawAutoTerminatedRelationshipValues;
if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull()
&& autoTerminatedSequence.size() > 0) {
for (YAML::const_iterator relIter = autoTerminatedSequence.begin();
relIter != autoTerminatedSequence.end(); ++relIter) {
std::string autoTerminatedRel = relIter->as<std::string>();
rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
}
}
procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
// handle processor properties
YAML::Node propertiesNode = procNode["Properties"];
std::vector<Property> properties;
if (propertiesNode.IsMap() && !propertiesNode.IsNull() && propertiesNode.size() > 0) {
std::map<std::string, std::string> propertiesMap = propertiesNode.as<
std::map<std::string, std::string>>();
for (std::map<std::string, std::string>::iterator propsIter = propertiesMap.begin();
propsIter != propertiesMap.end(); propsIter++) {
std::string propertyName = propsIter->first;
std::string propertyValue = propsIter->second;
if (!processor->setProperty(propertyName, propertyValue)) {
_logger->log_warn(
"Received property %s with value %s but is not one of the properties for %s",
propertyName.c_str(), propertyValue.c_str(), procCfg.name.c_str());
}
}
}
// Take care of scheduling
TimeUnit unit;
if (Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit)
&& Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
_logger->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
processor->setSchedulingPeriodNano(schedulingPeriod);
}
if (Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit)
&& Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
_logger->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms",
penalizationPeriod);
processor->setPenalizationPeriodMsec(penalizationPeriod);
}
if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit)
&& Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
_logger->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
processor->setYieldPeriodMsec(yieldPeriod);
}
// Default to running
processor->setScheduledState(RUNNING);
if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
processor->setSchedulingStrategy(TIMER_DRIVEN);
_logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
} else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
processor->setSchedulingStrategy(EVENT_DRIVEN);
_logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
} else {
processor->setSchedulingStrategy(CRON_DRIVEN);
_logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str());
}
int64_t maxConcurrentTasks;
if (Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
processor->setMaxConcurrentTasks(maxConcurrentTasks);
}
if (Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
_logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
processor->setRunDurationNano(runDurationNanos);
}
std::set<Relationship> autoTerminatedRelationships;
for (auto&& relString : procCfg.autoTerminatedRelationships) {
Relationship relationship(relString, "");
_logger->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString.c_str());
autoTerminatedRelationships.insert(relationship);
}
processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
parentGroup->addProcessor(processor);
}
}
} else {
throw new std::invalid_argument(
"Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
}
}
void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, ProcessGroup *parentGroup) {
uuid_t uuid;
if (!parentGroup) {
_logger->log_error("parseRemoteProcessGroupYaml: no parent group exists");
return;
}
if (rpgNode) {
if (rpgNode->IsSequence()) {
for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) {
YAML::Node rpgNode = iter->as<YAML::Node>();
auto name = rpgNode["name"].as<std::string>();
_logger->log_debug("parseRemoteProcessGroupYaml: name => [%s]", name.c_str());
std::string url = rpgNode["url"].as<std::string>();
_logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url.c_str());
std::string timeout = rpgNode["timeout"].as<std::string>();
_logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout.c_str());
std::string yieldPeriod = rpgNode["yield period"].as<std::string>();
_logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", yieldPeriod.c_str());
YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>();
if (inputPorts.IsSequence()) {
for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
_logger->log_debug("Got a current port, iterating...");
YAML::Node currPort = portIter->as<YAML::Node>();
this->parsePortYaml(&currPort, parentGroup, SEND);
} // for node
char uuidStr[37];
uuid_unparse(_uuid, uuidStr);
// generate the random UUID
uuid_generate(uuid);
int64_t timeoutValue = -1;
int64_t yieldPeriodValue = -1;
ProcessGroup* group = this->createRemoteProcessGroup(name.c_str(), uuid);
group->setParent(parentGroup);
parentGroup->addProcessGroup(group);
TimeUnit unit;
if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
&& Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) {
_logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", yieldPeriod.c_str());
group->setYieldPeriodMsec(yieldPeriodValue);
}
if (Property::StringToTime(timeout, timeoutValue, unit)
&& Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
_logger->log_debug("parseRemoteProcessGroup: timeoutValue => [%d] ms", timeout.c_str());
group->setTimeOut(yieldPeriodValue);
}
group->setTransmitting(true);
group->setURL(url);
}
}
}
}
}
void FlowController::parseConnectionYaml(YAML::Node *connectionsNode, ProcessGroup *parent) {
uuid_t uuid;
Connection *connection = NULL;
if (!parent) {
_logger->log_error("parseProcessNode: no parent group was provided");
return;
}
if (connectionsNode) {
int numConnections = connectionsNode->size();
if (numConnections < 1) {
throw new std::invalid_argument("There must be at least one connection configured.");
}
if (connectionsNode->IsSequence()) {
for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
// generate the random UIID
uuid_generate(uuid);
YAML::Node connectionNode = iter->as<YAML::Node>();
std::string name = connectionNode["name"].as<std::string>();
std::string destName = connectionNode["destination name"].as<std::string>();
char uuidStr[37];
uuid_unparse(_uuid, uuidStr);
_logger->log_debug("Created connection with UUID %s and name %s", uuidStr, name.c_str());
connection = this->createConnection(name, uuid);
auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
Relationship relationship(rawRelationship, "");
_logger->log_debug("parseConnection: relationship => [%s]", rawRelationship.c_str());
if (connection)
connection->setRelationship(relationship);
std::string connectionSrcProcName = connectionNode["source name"].as<std::string>();
Processor *srcProcessor = this->_root->findProcessor(connectionSrcProcName);
if (!srcProcessor) {
_logger->log_error("Could not locate a source with name %s to create a connection",
connectionSrcProcName.c_str());
throw std::invalid_argument(
"Could not locate a source with name %s to create a connection " + connectionSrcProcName);
}
Processor *destProcessor = this->_root->findProcessor(destName);
// If we could not find name, try by UUID
if (!destProcessor) {
uuid_t destUuid;
uuid_parse(destName.c_str(), destUuid);
destProcessor = this->_root->findProcessor(destUuid);
}
if (destProcessor) {
std::string destUuid = destProcessor->getUUIDStr();
}
uuid_t srcUuid;
uuid_t destUuid;
srcProcessor->getUUID(srcUuid);
connection->setSourceProcessorUUID(srcUuid);
destProcessor->getUUID(destUuid);
connection->setDestinationProcessorUUID(destUuid);
if (connection) {
parent->addConnection(connection);
}
}
}
if (connection)
parent->addConnection(connection);
return;
}
}
void FlowController::parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, ProcessGroup *parent) {
uuid_t uuid;
xmlNode *currentNode;
ProcessGroup *group = NULL;
int64_t yieldPeriod = -1;
int64_t timeOut = -1;
// generate the random UIID
uuid_generate(uuid);
for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
if (currentNode->type == XML_ELEMENT_NODE) {
if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
char *id = (char *) xmlNodeGetContent(currentNode);
if (id) {
_logger->log_debug("parseRootProcessGroup: id => [%s]", id);
uuid_parse(id, uuid);
xmlFree(id);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
char *name = (char *) xmlNodeGetContent(currentNode);
if (name) {
_logger->log_debug("parseRemoteProcessGroup: name => [%s]", name);
group = this->createRemoteProcessGroup(name, uuid);
if (group == NULL) {
xmlFree(name);
return;
}
group->setParent(parent);
parent->addProcessGroup(group);
xmlFree(name);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) {
TimeUnit unit;
char *temp = (char *) xmlNodeGetContent(currentNode);
if (temp) {
if (Property::StringToTime(temp, yieldPeriod, unit)
&& Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod) && group) {
_logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", yieldPeriod);
group->setYieldPeriodMsec(yieldPeriod);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "timeout") == 0) {
TimeUnit unit;
char *temp = (char *) xmlNodeGetContent(currentNode);
if (temp) {
if (Property::StringToTime(temp, timeOut, unit)
&& Property::ConvertTimeUnitToMS(timeOut, unit, timeOut) && group) {
_logger->log_debug("parseRemoteProcessGroup: timeOut => [%d] ms", timeOut);
group->setTimeOut(timeOut);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "transmitting") == 0) {
char *temp = (char *) xmlNodeGetContent(currentNode);
bool transmitting;
if (temp) {
if (Property::StringToBool(temp, transmitting) && group) {
_logger->log_debug("parseRemoteProcessGroup: transmitting => [%d]", transmitting);
group->setTransmitting(transmitting);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "inputPort") == 0 && group) {
this->parsePort(doc, currentNode, group, SEND);
} else if (xmlStrcmp(currentNode->name, BAD_CAST "outputPort") == 0 && group) {
this->parsePort(doc, currentNode, group, RECEIVE);
}
} // if (currentNode->type == XML_ELEMENT_NODE)
} // for node
}
void FlowController::parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processor *processor) {
xmlNode *currentNode;
std::string propertyValue;
std::string propertyName;
if (!processor) {
_logger->log_error("parseProcessorProperty: no parent processor existed");
return;
}
for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
if (currentNode->type == XML_ELEMENT_NODE) {
if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
char *name = (char *) xmlNodeGetContent(currentNode);
if (name) {
_logger->log_debug("parseProcessorNode: name => [%s]", name);
propertyName = name;
xmlFree(name);
}
}
if (xmlStrcmp(currentNode->name, BAD_CAST "value") == 0) {
char *value = (char *) xmlNodeGetContent(currentNode);
if (value) {
_logger->log_debug("parseProcessorNode: value => [%s]", value);
propertyValue = value;
xmlFree(value);
}
}
if (!propertyName.empty() && !propertyValue.empty()) {
processor->setProperty(propertyName, propertyValue);
}
} // if (currentNode->type == XML_ELEMENT_NODE)
} // for node
}
void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, TransferDirection direction) {
uuid_t uuid;
Processor *processor = NULL;
RemoteProcessorGroupPort *port = NULL;
if (!parent) {
_logger->log_error("parseProcessNode: no parent group existed");
return;
}
YAML::Node inputPortsObj = portNode->as<YAML::Node>();
// generate the random UIID
uuid_generate(uuid);
auto portId = inputPortsObj["id"].as<std::string>();
auto nameStr = inputPortsObj["name"].as<std::string>();
uuid_parse(portId.c_str(), uuid);
port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid);
processor = (Processor *) port;
port->setDirection(direction);
port->setTimeOut(parent->getTimeOut());
port->setTransmitting(true);
processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
processor->initialize();
// handle port properties
YAML::Node nodeVal = portNode->as<YAML::Node>();
YAML::Node propertiesNode = nodeVal["Properties"];
std::vector<Property> properties;
if (propertiesNode.IsMap() && !propertiesNode.IsNull() && propertiesNode.size() > 0) {
std::map<std::string, std::string> propertiesMap = propertiesNode.as<std::map<std::string, std::string>>();
for (std::map<std::string, std::string>::iterator propsIter = propertiesMap.begin();
propsIter != propertiesMap.end(); propsIter++) {
std::string propertyName = propsIter->first;
std::string propertyValue = propsIter->second;
if (!processor->setProperty(propertyName, propertyValue)) {
_logger->log_warn("Received property %s with value %s but is not one of the properties for %s",
propertyName.c_str(), propertyValue.c_str(), nameStr.c_str());
}
}
}
// add processor to parent
parent->addProcessor(processor);
processor->setScheduledState(RUNNING);
auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>();
int64_t maxConcurrentTasks;
if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
processor->setMaxConcurrentTasks(maxConcurrentTasks);
}
_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
processor->setMaxConcurrentTasks(maxConcurrentTasks);
}
void FlowController::parsePort(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent, TransferDirection direction) {
char *id = NULL;
char *name = NULL;
uuid_t uuid;
xmlNode *currentNode;
Processor *processor = NULL;
RemoteProcessorGroupPort *port = NULL;
if (!parent) {
_logger->log_error("parseProcessNode: no parent group existed");
return;
}
// generate the random UIID
uuid_generate(uuid);
for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
if (currentNode->type == XML_ELEMENT_NODE) {
if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
id = (char *) xmlNodeGetContent(currentNode);
if (id) {
_logger->log_debug("parseProcessorNode: id => [%s]", id);
uuid_parse(id, uuid);
xmlFree(id);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
name = (char *) xmlNodeGetContent(currentNode);
if (name) {
_logger->log_debug("parseProcessorNode: name => [%s]", name);
port = new RemoteProcessorGroupPort(name, uuid);
processor = (Processor *) port;
if (processor == NULL) {
xmlFree(name);
return;
}
port->setDirection(direction);
port->setTimeOut(parent->getTimeOut());
port->setTransmitting(parent->getTransmitting());
processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
processor->initialize();
// add processor to parent
parent->addProcessor(processor);
xmlFree(name);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) {
char *temp = (char *) xmlNodeGetContent(currentNode);
if (temp) {
std::string state = temp;
if (state == "DISABLED") {
_logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
processor->setScheduledState(DISABLED);
}
if (state == "STOPPED") {
_logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
processor->setScheduledState(STOPPED);
}
if (state == "RUNNING") {
_logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
processor->setScheduledState(RUNNING);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) {
char *temp = (char *) xmlNodeGetContent(currentNode);
if (temp) {
int64_t maxConcurrentTasks;
if (Property::StringToInt(temp, maxConcurrentTasks)) {
_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
processor->setMaxConcurrentTasks(maxConcurrentTasks);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) {
this->parseProcessorProperty(doc, currentNode, processor);
}
} // if (currentNode->type == XML_ELEMENT_NODE)
} // while node
}
void FlowController::parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent) {
char *id = NULL;
char *name = NULL;
int64_t schedulingPeriod = -1;
int64_t penalizationPeriod = -1;
int64_t yieldPeriod = -1;
bool lossTolerant = false;
int64_t runDurationNanos = -1;
uuid_t uuid;
xmlNode *currentNode;
Processor *processor = NULL;
if (!parent) {
_logger->log_error("parseProcessNode: no parent group existed");
return;
}
// generate the random UIID
uuid_generate(uuid);
for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
if (currentNode->type == XML_ELEMENT_NODE) {
if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
id = (char *) xmlNodeGetContent(currentNode);
if (id) {
_logger->log_debug("parseProcessorNode: id => [%s]", id);
uuid_parse(id, uuid);
xmlFree(id);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) {
name = (char *) xmlNodeGetContent(currentNode);
if (name) {
_logger->log_debug("parseProcessorNode: name => [%s]", name);
processor = this->createProcessor(name, uuid);
if (processor == NULL) {
xmlFree(name);
return;
}
// add processor to parent
parent->addProcessor(processor);
xmlFree(name);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingPeriod") == 0) {
TimeUnit unit;
char *temp = (char *) xmlNodeGetContent(currentNode);
if (temp) {
if (Property::StringToTime(temp, schedulingPeriod, unit)
&& Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
_logger->log_debug("parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
processor->setSchedulingPeriodNano(schedulingPeriod);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "penalizationPeriod") == 0) {
TimeUnit unit;
char *temp = (char *) xmlNodeGetContent(currentNode);
if (temp) {
if (Property::StringToTime(temp, penalizationPeriod, unit)
&& Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
_logger->log_debug("parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod);
processor->setPenalizationPeriodMsec(penalizationPeriod);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) {
TimeUnit unit;
char *temp = (char *) xmlNodeGetContent(currentNode);
if (temp) {
if (Property::StringToTime(temp, yieldPeriod, unit)
&& Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
_logger->log_debug("parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
processor->setYieldPeriodMsec(yieldPeriod);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "lossTolerant") == 0) {
char *temp = (char *) xmlNodeGetContent(currentNode);
if (temp) {
if (Property::StringToBool(temp, lossTolerant)) {
_logger->log_debug("parseProcessorNode: lossTolerant => [%d]", lossTolerant);
processor->setlossTolerant(lossTolerant);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) {
char *temp = (char *) xmlNodeGetContent(currentNode);
if (temp) {
std::string state = temp;
if (state == "DISABLED") {
_logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
processor->setScheduledState(DISABLED);
}
if (state == "STOPPED") {
_logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
processor->setScheduledState(STOPPED);
}
if (state == "RUNNING") {
_logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str());
processor->setScheduledState(RUNNING);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingStrategy") == 0) {
char *temp = (char *) xmlNodeGetContent(currentNode);
if (temp) {
std::string strategy = temp;
if (strategy == "TIMER_DRIVEN") {
_logger->log_debug("parseProcessorNode: scheduledStrategy => [%s]", strategy.c_str());
processor->setSchedulingStrategy(TIMER_DRIVEN);
}
if (strategy == "EVENT_DRIVEN") {
_logger->log_debug("parseProcessorNode: scheduledStrategy => [%s]", strategy.c_str());
processor->setSchedulingStrategy(EVENT_DRIVEN);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) {
char *temp = (char *) xmlNodeGetContent(currentNode);
if (temp) {
int64_t maxConcurrentTasks;
if (Property::StringToInt(temp, maxConcurrentTasks)) {
_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
processor->setMaxConcurrentTasks(maxConcurrentTasks);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "runDurationNanos") == 0) {
char *temp = (char *) xmlNodeGetContent(currentNode);
if (temp) {
if (Property::StringToInt(temp, runDurationNanos)) {
_logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
processor->setRunDurationNano(runDurationNanos);
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "autoTerminatedRelationship") == 0) {
char *temp = (char *) xmlNodeGetContent(currentNode);
if (temp) {
std::string relationshipName = temp;
Relationship relationship(relationshipName, "");
std::set<Relationship> relationships;
relationships.insert(relationship);
processor->setAutoTerminatedRelationships(relationships);
_logger->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]",
relationshipName.c_str());
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) {
this->parseProcessorProperty(doc, currentNode, processor);
}
} // if (currentNode->type == XML_ELEMENT_NODE)
} // while node
}
void FlowController::load(ConfigFormat configFormat) {
if (_running) {
stop(true);
}
if (!_initialized) {
_logger->log_info("Load Flow Controller from file %s", _configurationFileName.c_str());
if (ConfigFormat::XML == configFormat) {
_logger->log_info("Detected an XML configuration file for processing.");
xmlDoc *doc = xmlReadFile(_configurationFileName.c_str(), NULL, XML_PARSE_NONET);
if (doc == NULL) {
_logger->log_error("xmlReadFile returned NULL when reading [%s]", _configurationFileName.c_str());
_initialized = true;
return;
}
xmlNode *root = xmlDocGetRootElement(doc);
if (root == NULL) {
_logger->log_error("Can not get root from XML doc %s", _configurationFileName.c_str());
xmlFreeDoc(doc);
xmlCleanupParser();
}
if (xmlStrcmp(root->name, BAD_CAST "flowController") != 0) {
_logger->log_error("Root name is not flowController for XML doc %s", _configurationFileName.c_str());
xmlFreeDoc(doc);
xmlCleanupParser();
return;
}
xmlNode *currentNode;
for (currentNode = root->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) {
if (currentNode->type == XML_ELEMENT_NODE) {
if (xmlStrcmp(currentNode->name, BAD_CAST "rootGroup") == 0) {
this->parseRootProcessGroup(doc, currentNode);
} else if (xmlStrcmp(currentNode->name, BAD_CAST "maxTimerDrivenThreadCount") == 0) {
char *temp = (char *) xmlNodeGetContent(currentNode);
int64_t maxTimerDrivenThreadCount;
if (temp) {
if (Property::StringToInt(temp, maxTimerDrivenThreadCount)) {
_logger->log_debug("maxTimerDrivenThreadCount => [%d]", maxTimerDrivenThreadCount);
this->_maxTimerDrivenThreads = maxTimerDrivenThreadCount;
}
xmlFree(temp);
}
} else if (xmlStrcmp(currentNode->name, BAD_CAST "maxEventDrivenThreadCount") == 0) {
char *temp = (char *) xmlNodeGetContent(currentNode);
int64_t maxEventDrivenThreadCount;
if (temp) {
if (Property::StringToInt(temp, maxEventDrivenThreadCount)) {
_logger->log_debug("maxEventDrivenThreadCount => [%d]", maxEventDrivenThreadCount);
this->_maxEventDrivenThreads = maxEventDrivenThreadCount;
}
xmlFree(temp);
}
}
} // type == XML_ELEMENT_NODE
} // for
xmlFreeDoc(doc);
xmlCleanupParser();
_initialized = true;
} else if (ConfigFormat::YAML == configFormat) {
YAML::Node flow = YAML::LoadFile(_configurationFileName);
YAML::Node flowControllerNode = flow["Flow Controller"];
YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY];
YAML::Node connectionsNode = flow["Connections"];
YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"];
// Create the root process group
parseRootProcessGroupYaml(flowControllerNode);
parseProcessorNodeYaml(processorsNode, this->_root);
parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root);
parseConnectionYaml(&connectionsNode, this->_root);
_initialized = true;
}
}
}
bool FlowController::start() {
if (!_initialized) {
_logger->log_error("Can not start Flow Controller because it has not been initialized");
return false;
} else {
if (!_running) {
_logger->log_info("Start Flow Controller");
this->_timerScheduler.start();
if (this->_root)
this->_root->startProcessing(&this->_timerScheduler);
_running = true;
this->_protocol->start();
}
return true;
}
}