| /** |
| * @file FlowController.cpp |
| * FlowController class implementation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include <vector> |
| #include <queue> |
| #include <map> |
| #include <set> |
| #include <sys/time.h> |
| #include <time.h> |
| #include <chrono> |
| #include <thread> |
| #include <libxml/parser.h> |
| #include <libxml/tree.h> |
| |
| #include "FlowController.h" |
| #include "ProcessContext.h" |
| |
| FlowController::FlowController(std::string name) |
| : _name(name) |
| { |
| uuid_generate(_uuid); |
| |
| // Setup the default values |
| _configurationFileName = DEFAULT_FLOW_XML_FILE_NAME; |
| _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD; |
| _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD; |
| _running = false; |
| _initialized = false; |
| _root = NULL; |
| _logger = Logger::getLogger(); |
| _protocol = new FlowControlProtocol(this); |
| |
| // NiFi config properties |
| _configure = Configure::getConfigure(); |
| _configure->get(Configure::nifi_flow_configuration_file, _configurationFileName); |
| _logger->log_info("FlowController NiFi XML file %s", _configurationFileName.c_str()); |
| // Create repos for flow record and provenance |
| |
| _logger->log_info("FlowController %s created", _name.c_str()); |
| } |
| |
| FlowController::~FlowController() |
| { |
| stop(true); |
| unload(); |
| delete _protocol; |
| } |
| |
| bool FlowController::isRunning() |
| { |
| return (_running); |
| } |
| |
| bool FlowController::isInitialized() |
| { |
| return (_initialized); |
| } |
| |
| void FlowController::stop(bool force) |
| { |
| if (_running) |
| { |
| _logger->log_info("Stop Flow Controller"); |
| this->_timerScheduler.stop(); |
| // Wait for sometime for thread stop |
| std::this_thread::sleep_for(std::chrono::milliseconds(1000)); |
| if (this->_root) |
| this->_root->stopProcessing(&this->_timerScheduler); |
| _running = false; |
| } |
| } |
| |
| void FlowController::unload() |
| { |
| if (_running) |
| { |
| stop(true); |
| } |
| if (_initialized) |
| { |
| _logger->log_info("Unload Flow Controller"); |
| if (_root) |
| delete _root; |
| _root = NULL; |
| _initialized = false; |
| _name = ""; |
| } |
| |
| return; |
| } |
| |
| void FlowController::reload(std::string xmlFile) |
| { |
| _logger->log_info("Starting to reload Flow Controller with xml %s", xmlFile.c_str()); |
| stop(true); |
| unload(); |
| std::string oldxmlFile = this->_configurationFileName; |
| this->_configurationFileName = xmlFile; |
| load(ConfigFormat::XML); |
| start(); |
| if (!this->_root) |
| { |
| this->_configurationFileName = oldxmlFile; |
| _logger->log_info("Rollback Flow Controller to xml %s", oldxmlFile.c_str()); |
| stop(true); |
| unload(); |
| load(ConfigFormat::XML); |
| start(); |
| } |
| } |
| |
| Processor *FlowController::createProcessor(std::string name, uuid_t uuid) |
| { |
| Processor *processor = NULL; |
| if (name == GenerateFlowFile::ProcessorName) |
| { |
| processor = new GenerateFlowFile(name, uuid); |
| } |
| else if (name == LogAttribute::ProcessorName) |
| { |
| processor = new LogAttribute(name, uuid); |
| } |
| else if (name == RealTimeDataCollector::ProcessorName) |
| { |
| processor = new RealTimeDataCollector(name, uuid); |
| } |
| else if (name == GetFile::ProcessorName) |
| { |
| processor = new GetFile(name, uuid); |
| } |
| else if (name == TailFile::ProcessorName) |
| { |
| processor = new TailFile(name, uuid); |
| } |
| else if (name == ListenSyslog::ProcessorName) |
| { |
| processor = new ListenSyslog(name, uuid); |
| } |
| else |
| { |
| _logger->log_error("No Processor defined for %s", name.c_str()); |
| return NULL; |
| } |
| |
| //! initialize the processor |
| processor->initialize(); |
| |
| return processor; |
| } |
| |
| ProcessGroup *FlowController::createRootProcessGroup(std::string name, uuid_t uuid) |
| { |
| return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid); |
| } |
| |
| ProcessGroup *FlowController::createRemoteProcessGroup(std::string name, uuid_t uuid) |
| { |
| return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid); |
| } |
| |
| Connection *FlowController::createConnection(std::string name, uuid_t uuid) |
| { |
| return new Connection(name, uuid); |
| } |
| |
| void FlowController::parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *parent) |
| { |
| uuid_t uuid; |
| xmlNode *currentNode; |
| Connection *connection = NULL; |
| |
| if (!parent) |
| { |
| _logger->log_error("parseProcessNode: no parent group existed"); |
| return; |
| } |
| |
| // generate the random UIID |
| uuid_generate(uuid); |
| |
| for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) |
| { |
| if (currentNode->type == XML_ELEMENT_NODE) |
| { |
| if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) |
| { |
| char *id = (char *) xmlNodeGetContent(currentNode); |
| if (id) { |
| _logger->log_debug("parseConnection: id => [%s]", id); |
| uuid_parse(id, uuid); |
| xmlFree(id); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { |
| char *name = (char *) xmlNodeGetContent(currentNode); |
| if (name) { |
| _logger->log_debug("parseConnection: name => [%s]", name); |
| connection = this->createConnection(name, uuid); |
| if (connection == NULL) { |
| xmlFree(name); |
| return; |
| } |
| xmlFree(name); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "sourceId") == 0) { |
| char *id = (char *) xmlNodeGetContent(currentNode); |
| if (id) { |
| _logger->log_debug("parseConnection: sourceId => [%s]", id); |
| uuid_parse(id, uuid); |
| xmlFree(id); |
| if (connection) |
| connection->setSourceProcessorUUID(uuid); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "destinationId") == 0) { |
| char *id = (char *) xmlNodeGetContent(currentNode); |
| if (id) { |
| _logger->log_debug("parseConnection: destinationId => [%s]", id); |
| uuid_parse(id, uuid); |
| xmlFree(id); |
| if (connection) |
| connection->setDestinationProcessorUUID(uuid); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueSize") == 0) { |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| int64_t maxWorkQueueSize = 0; |
| if (temp) { |
| if (Property::StringToInt(temp, maxWorkQueueSize)) { |
| _logger->log_debug("parseConnection: maxWorkQueueSize => [%d]", maxWorkQueueSize); |
| if (connection) |
| connection->setMaxQueueSize(maxWorkQueueSize); |
| |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueDataSize") == 0) { |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| int64_t maxWorkQueueDataSize = 0; |
| if (temp) { |
| if (Property::StringToInt(temp, maxWorkQueueDataSize)) { |
| _logger->log_debug("parseConnection: maxWorkQueueDataSize => [%d]", maxWorkQueueDataSize); |
| if (connection) |
| connection->setMaxQueueDataSize(maxWorkQueueDataSize); |
| |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "relationship") == 0) { |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| if (temp) { |
| std::string relationshipName = temp; |
| if (!relationshipName.empty()) { |
| Relationship relationship(relationshipName, ""); |
| _logger->log_debug("parseConnection: relationship => [%s]", relationshipName.c_str()); |
| if (connection) |
| connection->setRelationship(relationship); |
| } else { |
| Relationship empty; |
| _logger->log_debug("parseConnection: relationship => [%s]", empty.getName().c_str()); |
| if (connection) |
| connection->setRelationship(empty); |
| } |
| xmlFree(temp); |
| } |
| } |
| } // if (currentNode->type == XML_ELEMENT_NODE) |
| } // for node |
| |
| if (connection) |
| parent->addConnection(connection); |
| |
| return; |
| } |
| |
| void FlowController::parseRootProcessGroup(xmlDoc *doc, xmlNode *node) { |
| uuid_t uuid; |
| xmlNode *currentNode; |
| ProcessGroup *group = NULL; |
| |
| // generate the random UIID |
| uuid_generate(uuid); |
| |
| for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { |
| if (currentNode->type == XML_ELEMENT_NODE) { |
| if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) { |
| char *id = (char *) xmlNodeGetContent(currentNode); |
| if (id) { |
| _logger->log_debug("parseRootProcessGroup: id => [%s]", id); |
| uuid_parse(id, uuid); |
| xmlFree(id); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { |
| char *name = (char *) xmlNodeGetContent(currentNode); |
| if (name) { |
| _logger->log_debug("parseRootProcessGroup: name => [%s]", name); |
| group = this->createRootProcessGroup(name, uuid); |
| if (group == NULL) { |
| xmlFree(name); |
| return; |
| } |
| // Set the root process group |
| this->_root = group; |
| this->_name = name; |
| xmlFree(name); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "processor") == 0) { |
| this->parseProcessorNode(doc, currentNode, group); |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "connection") == 0) { |
| this->parseConnection(doc, currentNode, group); |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "remoteProcessGroup") == 0) { |
| this->parseRemoteProcessGroup(doc, currentNode, group); |
| } |
| } // if (currentNode->type == XML_ELEMENT_NODE) |
| } // for node |
| } |
| |
| void FlowController::parseRootProcessGroupYaml(YAML::Node rootFlowNode) { |
| uuid_t uuid; |
| ProcessGroup *group = NULL; |
| |
| // generate the random UIID |
| uuid_generate(uuid); |
| |
| std::string flowName = rootFlowNode["name"].as<std::string>(); |
| |
| char uuidStr[37]; |
| uuid_unparse(_uuid, uuidStr); |
| _logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr); |
| _logger->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str()); |
| group = this->createRootProcessGroup(flowName, uuid); |
| this->_root = group; |
| this->_name = flowName; |
| } |
| |
| void FlowController::parseProcessorNodeYaml(YAML::Node processorsNode, ProcessGroup *parentGroup) { |
| int64_t schedulingPeriod = -1; |
| int64_t penalizationPeriod = -1; |
| int64_t yieldPeriod = -1; |
| int64_t runDurationNanos = -1; |
| uuid_t uuid; |
| Processor *processor = NULL; |
| |
| if (!parentGroup) { |
| _logger->log_error("parseProcessNodeYaml: no parent group exists"); |
| return; |
| } |
| |
| if (processorsNode) { |
| // Evaluate sequence of processors |
| int numProcessors = processorsNode.size(); |
| if (numProcessors < 1) { |
| throw new std::invalid_argument("There must be at least one processor configured."); |
| } |
| |
| std::vector<ProcessorConfig> processorConfigs; |
| |
| if (processorsNode.IsSequence()) { |
| for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) { |
| ProcessorConfig procCfg; |
| YAML::Node procNode = iter->as<YAML::Node>(); |
| |
| procCfg.name = procNode["name"].as<std::string>(); |
| _logger->log_debug("parseProcessorNode: name => [%s]", procCfg.name.c_str()); |
| procCfg.javaClass = procNode["class"].as<std::string>(); |
| _logger->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass.c_str()); |
| |
| char uuidStr[37]; |
| uuid_unparse(_uuid, uuidStr); |
| |
| // generate the random UUID |
| uuid_generate(uuid); |
| |
| // Determine the processor name only from the Java class |
| int lastOfIdx = procCfg.javaClass.find_last_of("."); |
| if (lastOfIdx != std::string::npos) { |
| lastOfIdx++; // if a value is found, increment to move beyond the . |
| int nameLength = procCfg.javaClass.length() - lastOfIdx; |
| std::string processorName = procCfg.javaClass.substr(lastOfIdx, nameLength); |
| processor = this->createProcessor(processorName, uuid); |
| } |
| |
| if (!processor) { |
| _logger->log_error("Could not create a processor %s with name %s", procCfg.name.c_str(), uuidStr); |
| throw std::invalid_argument("Could not create processor " + procCfg.name); |
| } |
| processor->setName(procCfg.name); |
| |
| procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>(); |
| _logger->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks.c_str()); |
| procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>(); |
| _logger->log_debug("parseProcessorNode: scheduling strategy => [%s]", |
| procCfg.schedulingStrategy.c_str()); |
| procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>(); |
| _logger->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod.c_str()); |
| procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>(); |
| _logger->log_debug("parseProcessorNode: penalization period => [%s]", |
| procCfg.penalizationPeriod.c_str()); |
| procCfg.yieldPeriod = procNode["yield period"].as<std::string>(); |
| _logger->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod.c_str()); |
| procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>(); |
| _logger->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos.c_str()); |
| |
| // handle auto-terminated relationships |
| YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"]; |
| std::vector<std::string> rawAutoTerminatedRelationshipValues; |
| if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() |
| && autoTerminatedSequence.size() > 0) { |
| _logger->log_debug("Found non-empty auto terminated sequence... interpreting."); |
| for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); |
| relIter != autoTerminatedSequence.end(); ++relIter) { |
| std::string autoTerminatedRel = relIter->as<std::string>(); |
| _logger->log_debug("Auto terminating relationship %s", autoTerminatedRel.c_str()); |
| rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel); |
| } |
| } else { |
| _logger->log_debug("no relationships are auto terminated here..."); |
| } |
| procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues; |
| |
| // handle processor properties |
| YAML::Node propertiesNode = procNode["Properties"]; |
| std::vector<Property> properties; |
| |
| if (propertiesNode.IsMap() && !propertiesNode.IsNull() && propertiesNode.size() > 0) { |
| std::map<std::string, std::string> propertiesMap = propertiesNode.as< |
| std::map<std::string, std::string>>(); |
| for (std::map<std::string, std::string>::iterator propsIter = propertiesMap.begin(); |
| propsIter != propertiesMap.end(); propsIter++) { |
| std::string propertyName = propsIter->first; |
| std::string propertyValue = propsIter->second; |
| if (!processor->setProperty(propertyName, propertyValue)) { |
| _logger->log_warn( |
| "Received property %s with value %s but is not one of the properties for %s", |
| propertyName.c_str(), propertyValue.c_str(), procCfg.name.c_str()); |
| } |
| } |
| } |
| |
| // Take care of scheduling |
| TimeUnit unit; |
| if (Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) |
| && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { |
| _logger->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod); |
| processor->setSchedulingPeriodNano(schedulingPeriod); |
| } |
| |
| if (Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) |
| && Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) { |
| _logger->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms", |
| penalizationPeriod); |
| processor->setPenalizationPeriodMsec(penalizationPeriod); |
| } |
| |
| if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) |
| && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) { |
| _logger->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod); |
| processor->setYieldPeriodMsec(yieldPeriod); |
| } |
| |
| // Default to running |
| processor->setScheduledState(RUNNING); |
| |
| if (procCfg.schedulingStrategy == "TIMER_DRIVEN") { |
| processor->setSchedulingStrategy(TIMER_DRIVEN); |
| _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str()); |
| } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") { |
| processor->setSchedulingStrategy(EVENT_DRIVEN); |
| _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str()); |
| } else { |
| processor->setSchedulingStrategy(CRON_DRIVEN); |
| _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str()); |
| |
| } |
| |
| int64_t maxConcurrentTasks; |
| if (Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) { |
| _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); |
| processor->setMaxConcurrentTasks(maxConcurrentTasks); |
| } |
| |
| if (Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) { |
| _logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos); |
| processor->setRunDurationNano(runDurationNanos); |
| } |
| |
| std::set<Relationship> autoTerminatedRelationships; |
| for (auto&& relString : procCfg.autoTerminatedRelationships) { |
| Relationship relationship(relString, ""); |
| _logger->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString.c_str()); |
| autoTerminatedRelationships.insert(relationship); |
| } |
| |
| processor->setAutoTerminatedRelationships(autoTerminatedRelationships); |
| |
| parentGroup->addProcessor(processor); |
| } |
| } |
| } else { |
| throw new std::invalid_argument( |
| "Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); |
| } |
| } |
| |
| void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, ProcessGroup *parentGroup) { |
| uuid_t uuid; |
| |
| if (!parentGroup) { |
| _logger->log_error("parseRemoteProcessGroupYaml: no parent group exists"); |
| return; |
| } |
| |
| if (rpgNode) { |
| if (rpgNode->IsSequence()) { |
| for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) { |
| YAML::Node rpgNode = iter->as<YAML::Node>(); |
| |
| auto name = rpgNode["name"].as<std::string>(); |
| _logger->log_debug("parseRemoteProcessGroupYaml: name => [%s]", name.c_str()); |
| |
| std::string url = rpgNode["url"].as<std::string>(); |
| _logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url.c_str()); |
| |
| std::string timeout = rpgNode["timeout"].as<std::string>(); |
| _logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout.c_str()); |
| |
| std::string yieldPeriod = rpgNode["yield period"].as<std::string>(); |
| _logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", yieldPeriod.c_str()); |
| |
| YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>(); |
| |
| if (inputPorts.IsSequence()) { |
| for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) { |
| _logger->log_debug("Got a current port, iterating..."); |
| |
| YAML::Node currPort = portIter->as<YAML::Node>(); |
| |
| this->parsePortYaml(&currPort, parentGroup, SEND); |
| |
| } // for node |
| char uuidStr[37]; |
| uuid_unparse(_uuid, uuidStr); |
| |
| // generate the random UUID |
| uuid_generate(uuid); |
| |
| int64_t timeoutValue = -1; |
| int64_t yieldPeriodValue = -1; |
| |
| ProcessGroup* group = this->createRemoteProcessGroup(name.c_str(), uuid); |
| group->setParent(parentGroup); |
| parentGroup->addProcessGroup(group); |
| |
| TimeUnit unit; |
| |
| if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) |
| && Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) { |
| _logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", yieldPeriod.c_str()); |
| group->setYieldPeriodMsec(yieldPeriodValue); |
| } |
| |
| if (Property::StringToTime(timeout, timeoutValue, unit) |
| && Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) { |
| _logger->log_debug("parseRemoteProcessGroup: timeoutValue => [%d] ms", timeout.c_str()); |
| group->setTimeOut(yieldPeriodValue); |
| } |
| |
| group->setTransmitting(true); |
| group->setURL(url); |
| |
| } |
| } |
| } |
| } |
| } |
| |
| void FlowController::parseConnectionYaml(YAML::Node *connectionsNode, ProcessGroup *parent) { |
| uuid_t uuid; |
| Connection *connection = NULL; |
| |
| if (!parent) { |
| _logger->log_error("parseProcessNode: no parent group was provided"); |
| return; |
| } |
| |
| if (connectionsNode) { |
| int numConnections = connectionsNode->size(); |
| if (numConnections < 1) { |
| throw new std::invalid_argument("There must be at least one connection configured."); |
| } |
| |
| if (connectionsNode->IsSequence()) { |
| for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) { |
| // generate the random UIID |
| uuid_generate(uuid); |
| |
| YAML::Node connectionNode = iter->as<YAML::Node>(); |
| |
| std::string name = connectionNode["name"].as<std::string>(); |
| std::string destName = connectionNode["destination name"].as<std::string>(); |
| |
| char uuidStr[37]; |
| uuid_unparse(_uuid, uuidStr); |
| |
| _logger->log_debug("Created connection with UUID %s and name %s", uuidStr, name.c_str()); |
| connection = this->createConnection(name, uuid); |
| auto rawRelationship = connectionNode["source relationship name"].as<std::string>(); |
| Relationship relationship(rawRelationship, ""); |
| _logger->log_debug("parseConnection: relationship => [%s]", rawRelationship.c_str()); |
| if (connection) |
| connection->setRelationship(relationship); |
| std::string connectionSrcProcName = connectionNode["source name"].as<std::string>(); |
| Processor *srcProcessor = this->_root->findProcessor(connectionSrcProcName); |
| _logger->log_debug("I see processor with name %s looking for source with name %s", |
| this->_root->findProcessor(connectionSrcProcName)->getName().c_str(), |
| connectionSrcProcName.c_str()); |
| if (!srcProcessor) { |
| _logger->log_error("Could not locate a source with name %s to create a connection", |
| connectionSrcProcName.c_str()); |
| throw std::invalid_argument( |
| "Could not locate a source with name %s to create a connection " + connectionSrcProcName); |
| } |
| |
| _logger->log_debug("This processor has UUID of %s", srcProcessor->getUUIDStr().c_str()); |
| _logger->log_trace("Trying to find dest processor by name %s", destName.c_str()); |
| Processor *destProcessor = this->_root->findProcessor(destName); |
| // If we could not find name, try by UUID |
| if (!destProcessor) { |
| _logger->log_trace("Now looking up by uuid"); |
| uuid_t destUuid; |
| uuid_parse(destName.c_str(), destUuid); |
| destProcessor = this->_root->findProcessor(destUuid); |
| } |
| if (destProcessor) { |
| std::string destUuid = destProcessor->getUUIDStr(); |
| if (!destUuid.empty()) { |
| _logger->log_debug("This destination processor has UUID of %s", |
| destProcessor->getUUIDStr().c_str()); |
| } |
| } else { |
| _logger->log_debug("!!! === Could not find a destination processor for the connection."); |
| } |
| |
| uuid_t srcUuid; |
| uuid_t destUuid; |
| srcProcessor->getUUID(srcUuid); |
| connection->setSourceProcessorUUID(srcUuid); |
| destProcessor->getUUID(destUuid); |
| connection->setDestinationProcessorUUID(destUuid); |
| |
| if (connection) { |
| parent->addConnection(connection); |
| } |
| } |
| } |
| |
| if (connection) |
| parent->addConnection(connection); |
| |
| return; |
| } |
| } |
| |
| void FlowController::parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, ProcessGroup *parent) { |
| uuid_t uuid; |
| xmlNode *currentNode; |
| ProcessGroup *group = NULL; |
| int64_t yieldPeriod = -1; |
| int64_t timeOut = -1; |
| |
| // generate the random UIID |
| uuid_generate(uuid); |
| |
| for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { |
| if (currentNode->type == XML_ELEMENT_NODE) { |
| if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) { |
| char *id = (char *) xmlNodeGetContent(currentNode); |
| if (id) { |
| _logger->log_debug("parseRootProcessGroup: id => [%s]", id); |
| uuid_parse(id, uuid); |
| xmlFree(id); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { |
| char *name = (char *) xmlNodeGetContent(currentNode); |
| if (name) { |
| _logger->log_debug("parseRemoteProcessGroup: name => [%s]", name); |
| group = this->createRemoteProcessGroup(name, uuid); |
| if (group == NULL) { |
| xmlFree(name); |
| return; |
| } |
| group->setParent(parent); |
| parent->addProcessGroup(group); |
| xmlFree(name); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) { |
| TimeUnit unit; |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| if (temp) { |
| if (Property::StringToTime(temp, yieldPeriod, unit) |
| && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod) && group) { |
| _logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", yieldPeriod); |
| group->setYieldPeriodMsec(yieldPeriod); |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "timeout") == 0) { |
| TimeUnit unit; |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| if (temp) { |
| if (Property::StringToTime(temp, timeOut, unit) |
| && Property::ConvertTimeUnitToMS(timeOut, unit, timeOut) && group) { |
| _logger->log_debug("parseRemoteProcessGroup: timeOut => [%d] ms", timeOut); |
| group->setTimeOut(timeOut); |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "transmitting") == 0) { |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| bool transmitting; |
| if (temp) { |
| if (Property::StringToBool(temp, transmitting) && group) { |
| _logger->log_debug("parseRemoteProcessGroup: transmitting => [%d]", transmitting); |
| group->setTransmitting(transmitting); |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "inputPort") == 0 && group) { |
| this->parsePort(doc, currentNode, group, SEND); |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "outputPort") == 0 && group) { |
| this->parsePort(doc, currentNode, group, RECEIVE); |
| } |
| } // if (currentNode->type == XML_ELEMENT_NODE) |
| } // for node |
| } |
| |
| void FlowController::parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processor *processor) { |
| xmlNode *currentNode; |
| std::string propertyValue; |
| std::string propertyName; |
| |
| if (!processor) { |
| _logger->log_error("parseProcessorProperty: no parent processor existed"); |
| return; |
| } |
| |
| for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { |
| if (currentNode->type == XML_ELEMENT_NODE) { |
| if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { |
| char *name = (char *) xmlNodeGetContent(currentNode); |
| if (name) { |
| _logger->log_debug("parseProcessorNode: name => [%s]", name); |
| propertyName = name; |
| xmlFree(name); |
| } |
| } |
| if (xmlStrcmp(currentNode->name, BAD_CAST "value") == 0) { |
| char *value = (char *) xmlNodeGetContent(currentNode); |
| if (value) { |
| _logger->log_debug("parseProcessorNode: value => [%s]", value); |
| propertyValue = value; |
| xmlFree(value); |
| } |
| } |
| if (!propertyName.empty() && !propertyValue.empty()) { |
| processor->setProperty(propertyName, propertyValue); |
| } |
| } // if (currentNode->type == XML_ELEMENT_NODE) |
| } // for node |
| } |
| |
| void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, TransferDirection direction) { |
| uuid_t uuid; |
| Processor *processor = NULL; |
| RemoteProcessorGroupPort *port = NULL; |
| |
| _logger->log_trace("Creating a port from YAML."); |
| |
| if (!parent) { |
| _logger->log_error("parseProcessNode: no parent group existed"); |
| return; |
| } |
| |
| YAML::Node inputPortsObj = portNode->as<YAML::Node>(); |
| |
| // generate the random UIID |
| uuid_generate(uuid); |
| |
| auto portId = inputPortsObj["id"].as<std::string>(); |
| auto nameStr = inputPortsObj["name"].as<std::string>(); |
| uuid_parse(portId.c_str(), uuid); |
| |
| port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid); |
| _logger->log_debug("parse input port: name => [%s]", nameStr.c_str()); |
| _logger->log_debug("parse input port: id => [%s]", portId.c_str()); |
| |
| processor = (Processor *) port; |
| port->setDirection(direction); |
| port->setTimeOut(parent->getTimeOut()); |
| port->setTransmitting(true); |
| processor->setYieldPeriodMsec(parent->getYieldPeriodMsec()); |
| processor->initialize(); |
| |
| // handle port properties |
| YAML::Node nodeVal = portNode->as<YAML::Node>(); |
| YAML::Node propertiesNode = nodeVal["Properties"]; |
| std::vector<Property> properties; |
| |
| _logger->log_debug("!!! === Checking out properties for input port...."); |
| |
| if (propertiesNode.IsMap() && !propertiesNode.IsNull() && propertiesNode.size() > 0) { |
| std::map<std::string, std::string> propertiesMap = propertiesNode.as<std::map<std::string, std::string>>(); |
| _logger->log_debug("Found non-empty properties sequence... interpreting."); |
| for (std::map<std::string, std::string>::iterator propsIter = propertiesMap.begin(); |
| propsIter != propertiesMap.end(); propsIter++) { |
| std::string propertyName = propsIter->first; |
| std::string propertyValue = propsIter->second; |
| _logger->log_debug("Detected property %s => %s", propertyName.c_str(), propertyValue.c_str()); |
| if (!processor->setProperty(propertyName, propertyValue)) { |
| _logger->log_warn("Received property %s with value %s but is not one of the properties for %s", |
| propertyName.c_str(), propertyValue.c_str(), nameStr.c_str()); |
| } |
| } |
| } else { |
| _logger->log_debug("no properties here..."); |
| } |
| |
| // add processor to parent |
| parent->addProcessor(processor); |
| processor->setScheduledState(RUNNING); |
| auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>(); |
| int64_t maxConcurrentTasks; |
| if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { |
| processor->setMaxConcurrentTasks(maxConcurrentTasks); |
| } |
| _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); |
| processor->setMaxConcurrentTasks(maxConcurrentTasks); |
| |
| } |
| |
| void FlowController::parsePort(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent, TransferDirection direction) { |
| char *id = NULL; |
| char *name = NULL; |
| uuid_t uuid; |
| xmlNode *currentNode; |
| Processor *processor = NULL; |
| RemoteProcessorGroupPort *port = NULL; |
| |
| if (!parent) { |
| _logger->log_error("parseProcessNode: no parent group existed"); |
| return; |
| } |
| // generate the random UIID |
| uuid_generate(uuid); |
| |
| for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { |
| if (currentNode->type == XML_ELEMENT_NODE) { |
| if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) { |
| id = (char *) xmlNodeGetContent(currentNode); |
| if (id) { |
| _logger->log_debug("parseProcessorNode: id => [%s]", id); |
| uuid_parse(id, uuid); |
| xmlFree(id); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { |
| name = (char *) xmlNodeGetContent(currentNode); |
| if (name) { |
| _logger->log_debug("parseProcessorNode: name => [%s]", name); |
| port = new RemoteProcessorGroupPort(name, uuid); |
| processor = (Processor *) port; |
| if (processor == NULL) { |
| xmlFree(name); |
| return; |
| } |
| port->setDirection(direction); |
| port->setTimeOut(parent->getTimeOut()); |
| port->setTransmitting(parent->getTransmitting()); |
| processor->setYieldPeriodMsec(parent->getYieldPeriodMsec()); |
| processor->initialize(); |
| // add processor to parent |
| parent->addProcessor(processor); |
| xmlFree(name); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) { |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| if (temp) { |
| std::string state = temp; |
| if (state == "DISABLED") { |
| _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); |
| processor->setScheduledState(DISABLED); |
| } |
| if (state == "STOPPED") { |
| _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); |
| processor->setScheduledState(STOPPED); |
| } |
| if (state == "RUNNING") { |
| _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); |
| processor->setScheduledState(RUNNING); |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) { |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| if (temp) { |
| int64_t maxConcurrentTasks; |
| if (Property::StringToInt(temp, maxConcurrentTasks)) { |
| _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); |
| processor->setMaxConcurrentTasks(maxConcurrentTasks); |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) { |
| this->parseProcessorProperty(doc, currentNode, processor); |
| } |
| } // if (currentNode->type == XML_ELEMENT_NODE) |
| } // while node |
| } |
| |
| void FlowController::parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent) { |
| char *id = NULL; |
| char *name = NULL; |
| int64_t schedulingPeriod = -1; |
| int64_t penalizationPeriod = -1; |
| int64_t yieldPeriod = -1; |
| bool lossTolerant = false; |
| int64_t runDurationNanos = -1; |
| uuid_t uuid; |
| xmlNode *currentNode; |
| Processor *processor = NULL; |
| |
| if (!parent) { |
| _logger->log_error("parseProcessNode: no parent group existed"); |
| return; |
| } |
| // generate the random UIID |
| uuid_generate(uuid); |
| |
| for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { |
| if (currentNode->type == XML_ELEMENT_NODE) { |
| if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) { |
| id = (char *) xmlNodeGetContent(currentNode); |
| if (id) { |
| _logger->log_debug("parseProcessorNode: id => [%s]", id); |
| uuid_parse(id, uuid); |
| xmlFree(id); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { |
| name = (char *) xmlNodeGetContent(currentNode); |
| if (name) { |
| _logger->log_debug("parseProcessorNode: name => [%s]", name); |
| processor = this->createProcessor(name, uuid); |
| if (processor == NULL) { |
| xmlFree(name); |
| return; |
| } |
| // add processor to parent |
| parent->addProcessor(processor); |
| xmlFree(name); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingPeriod") == 0) { |
| TimeUnit unit; |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| if (temp) { |
| if (Property::StringToTime(temp, schedulingPeriod, unit) |
| && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { |
| _logger->log_debug("parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod); |
| processor->setSchedulingPeriodNano(schedulingPeriod); |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "penalizationPeriod") == 0) { |
| TimeUnit unit; |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| if (temp) { |
| if (Property::StringToTime(temp, penalizationPeriod, unit) |
| && Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) { |
| _logger->log_debug("parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod); |
| processor->setPenalizationPeriodMsec(penalizationPeriod); |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) { |
| TimeUnit unit; |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| if (temp) { |
| if (Property::StringToTime(temp, yieldPeriod, unit) |
| && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) { |
| _logger->log_debug("parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod); |
| processor->setYieldPeriodMsec(yieldPeriod); |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "lossTolerant") == 0) { |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| if (temp) { |
| if (Property::StringToBool(temp, lossTolerant)) { |
| _logger->log_debug("parseProcessorNode: lossTolerant => [%d]", lossTolerant); |
| processor->setlossTolerant(lossTolerant); |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) { |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| if (temp) { |
| std::string state = temp; |
| if (state == "DISABLED") { |
| _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); |
| processor->setScheduledState(DISABLED); |
| } |
| if (state == "STOPPED") { |
| _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); |
| processor->setScheduledState(STOPPED); |
| } |
| if (state == "RUNNING") { |
| _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); |
| processor->setScheduledState(RUNNING); |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingStrategy") == 0) { |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| if (temp) { |
| std::string strategy = temp; |
| if (strategy == "TIMER_DRIVEN") { |
| _logger->log_debug("parseProcessorNode: scheduledStrategy => [%s]", strategy.c_str()); |
| processor->setSchedulingStrategy(TIMER_DRIVEN); |
| } |
| if (strategy == "EVENT_DRIVEN") { |
| _logger->log_debug("parseProcessorNode: scheduledStrategy => [%s]", strategy.c_str()); |
| processor->setSchedulingStrategy(EVENT_DRIVEN); |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) { |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| if (temp) { |
| int64_t maxConcurrentTasks; |
| if (Property::StringToInt(temp, maxConcurrentTasks)) { |
| _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); |
| processor->setMaxConcurrentTasks(maxConcurrentTasks); |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "runDurationNanos") == 0) { |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| if (temp) { |
| if (Property::StringToInt(temp, runDurationNanos)) { |
| _logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos); |
| processor->setRunDurationNano(runDurationNanos); |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "autoTerminatedRelationship") == 0) { |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| if (temp) { |
| std::string relationshipName = temp; |
| Relationship relationship(relationshipName, ""); |
| std::set<Relationship> relationships; |
| |
| relationships.insert(relationship); |
| processor->setAutoTerminatedRelationships(relationships); |
| _logger->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", |
| relationshipName.c_str()); |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) { |
| this->parseProcessorProperty(doc, currentNode, processor); |
| } |
| } // if (currentNode->type == XML_ELEMENT_NODE) |
| } // while node |
| } |
| |
| void FlowController::load(ConfigFormat configFormat) { |
| if (_running) { |
| stop(true); |
| } |
| if (!_initialized) { |
| _logger->log_info("Load Flow Controller from file %s", _configurationFileName.c_str()); |
| |
| if (ConfigFormat::XML == configFormat) { |
| _logger->log_info("Detected an XML configuration file for processing."); |
| |
| xmlDoc *doc = xmlReadFile(_configurationFileName.c_str(), NULL, XML_PARSE_NONET); |
| if (doc == NULL) { |
| _logger->log_error("xmlReadFile returned NULL when reading [%s]", _configurationFileName.c_str()); |
| _initialized = true; |
| return; |
| } |
| |
| xmlNode *root = xmlDocGetRootElement(doc); |
| |
| if (root == NULL) { |
| _logger->log_error("Can not get root from XML doc %s", _configurationFileName.c_str()); |
| xmlFreeDoc(doc); |
| xmlCleanupParser(); |
| } |
| |
| if (xmlStrcmp(root->name, BAD_CAST "flowController") != 0) { |
| _logger->log_error("Root name is not flowController for XML doc %s", _configurationFileName.c_str()); |
| xmlFreeDoc(doc); |
| xmlCleanupParser(); |
| return; |
| } |
| |
| xmlNode *currentNode; |
| |
| for (currentNode = root->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { |
| if (currentNode->type == XML_ELEMENT_NODE) { |
| if (xmlStrcmp(currentNode->name, BAD_CAST "rootGroup") == 0) { |
| this->parseRootProcessGroup(doc, currentNode); |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxTimerDrivenThreadCount") == 0) { |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| int64_t maxTimerDrivenThreadCount; |
| if (temp) { |
| if (Property::StringToInt(temp, maxTimerDrivenThreadCount)) { |
| _logger->log_debug("maxTimerDrivenThreadCount => [%d]", maxTimerDrivenThreadCount); |
| this->_maxTimerDrivenThreads = maxTimerDrivenThreadCount; |
| } |
| xmlFree(temp); |
| } |
| } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxEventDrivenThreadCount") == 0) { |
| char *temp = (char *) xmlNodeGetContent(currentNode); |
| int64_t maxEventDrivenThreadCount; |
| if (temp) { |
| if (Property::StringToInt(temp, maxEventDrivenThreadCount)) { |
| _logger->log_debug("maxEventDrivenThreadCount => [%d]", maxEventDrivenThreadCount); |
| this->_maxEventDrivenThreads = maxEventDrivenThreadCount; |
| } |
| xmlFree(temp); |
| } |
| } |
| } // type == XML_ELEMENT_NODE |
| } // for |
| |
| xmlFreeDoc(doc); |
| xmlCleanupParser(); |
| _initialized = true; |
| } else if (ConfigFormat::YAML == configFormat) { |
| _logger->log_info("Detected a YAML configuration file for processing."); |
| |
| YAML::Node flow = YAML::LoadFile(_configurationFileName); |
| |
| YAML::Node flowControllerNode = flow["Flow Controller"]; |
| YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY]; |
| YAML::Node connectionsNode = flow["Connections"]; |
| YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"]; |
| |
| // Create the root process group |
| parseRootProcessGroupYaml(flowControllerNode); |
| parseProcessorNodeYaml(processorsNode, this->_root); |
| parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root); |
| parseConnectionYaml(&connectionsNode, this->_root); |
| |
| _initialized = true; |
| } |
| } |
| } |
| |
| bool FlowController::start() { |
| if (!_initialized) { |
| _logger->log_error("Can not start Flow Controller because it has not been initialized"); |
| return false; |
| } else { |
| if (!_running) { |
| _logger->log_info("Start Flow Controller"); |
| this->_timerScheduler.start(); |
| if (this->_root) |
| this->_root->startProcessing(&this->_timerScheduler); |
| _running = true; |
| this->_protocol->start(); |
| } |
| return true; |
| } |
| } |