| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "core/yaml/YamlConfiguration.h" |
| #include <memory> |
| #include <string> |
| #include <vector> |
| #include <set> |
| namespace org { |
| namespace apache { |
| namespace nifi { |
| namespace minifi { |
| namespace core { |
| |
| core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml( |
| YAML::Node rootFlowNode) { |
| uuid_t uuid; |
| |
| checkRequiredField(&rootFlowNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); |
| std::string flowName = rootFlowNode["name"].as<std::string>(); |
| std::string id = getOrGenerateId(&rootFlowNode); |
| uuid_parse(id.c_str(), uuid); |
| |
| logger_->log_debug( |
| "parseRootProcessGroup: id => [%s], name => [%s]", id, flowName); |
| std::unique_ptr<core::ProcessGroup> group = |
| FlowConfiguration::createRootProcessGroup(flowName, uuid); |
| |
| this->name_ = flowName; |
| |
| return group.release(); |
| } |
| |
| void YamlConfiguration::parseProcessorNodeYaml( |
| YAML::Node processorsNode, core::ProcessGroup * parentGroup) { |
| int64_t schedulingPeriod = -1; |
| int64_t penalizationPeriod = -1; |
| int64_t yieldPeriod = -1; |
| int64_t runDurationNanos = -1; |
| uuid_t uuid; |
| std::shared_ptr<core::Processor> processor = nullptr; |
| |
| if (!parentGroup) { |
| logger_->log_error("parseProcessNodeYaml: no parent group exists"); |
| return; |
| } |
| |
| if (processorsNode) { |
| if (processorsNode.IsSequence()) { |
| // Evaluate sequence of processors |
| for (YAML::const_iterator iter = processorsNode.begin(); |
| iter != processorsNode.end(); ++iter) { |
| core::ProcessorConfig procCfg; |
| YAML::Node procNode = iter->as<YAML::Node>(); |
| |
| checkRequiredField(&procNode, "name", CONFIG_YAML_PROCESSORS_KEY); |
| procCfg.name = procNode["name"].as<std::string>(); |
| procCfg.id = getOrGenerateId(&procNode); |
| uuid_parse(procCfg.id.c_str(), uuid); |
| logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", |
| procCfg.name, procCfg.id); |
| checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY); |
| procCfg.javaClass = procNode["class"].as<std::string>(); |
| logger_->log_debug("parseProcessorNode: class => [%s]", |
| procCfg.javaClass); |
| |
| // Determine the processor name only from the Java class |
| int lastOfIdx = procCfg.javaClass.find_last_of("."); |
| if (lastOfIdx != std::string::npos) { |
| lastOfIdx++; // if a value is found, increment to move beyond the . |
| int nameLength = procCfg.javaClass.length() - lastOfIdx; |
| std::string processorName = procCfg.javaClass.substr(lastOfIdx, |
| nameLength); |
| processor = this->createProcessor(processorName, uuid); |
| } |
| |
| if (!processor) { |
| logger_->log_error("Could not create a processor %s with id %s", |
| procCfg.name, procCfg.id); |
| throw std::invalid_argument( |
| "Could not create processor " + procCfg.name); |
| } |
| processor->setName(procCfg.name); |
| |
| checkRequiredField(&procNode, "scheduling strategy", CONFIG_YAML_PROCESSORS_KEY); |
| procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>(); |
| logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy); |
| |
| checkRequiredField(&procNode, "scheduling period", CONFIG_YAML_PROCESSORS_KEY); |
| procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>(); |
| logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod); |
| |
| if (procNode["max concurrent tasks"]) { |
| procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>(); |
| logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks); |
| } |
| |
| if (procNode["penalization period"]) { |
| procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>(); |
| logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod); |
| } |
| |
| if (procNode["yield period"]) { |
| procCfg.yieldPeriod = procNode["yield period"].as<std::string>(); |
| logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod); |
| } |
| |
| if (procNode["run duration nanos"]) { |
| procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>(); |
| logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos); |
| } |
| |
| // handle auto-terminated relationships |
| if (procNode["auto-terminated relationships list"]) { |
| YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"]; |
| std::vector<std::string> rawAutoTerminatedRelationshipValues; |
| if (autoTerminatedSequence.IsSequence() |
| && !autoTerminatedSequence.IsNull() |
| && autoTerminatedSequence.size() > 0) { |
| for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); |
| relIter != autoTerminatedSequence.end(); ++relIter) { |
| std::string autoTerminatedRel = relIter->as<std::string>(); |
| rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel); |
| } |
| } |
| procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues; |
| } |
| |
| // handle processor properties |
| if (procNode["Properties"]) { |
| YAML::Node propertiesNode = procNode["Properties"]; |
| parsePropertiesNodeYaml(&propertiesNode, processor); |
| } |
| |
| // Take care of scheduling |
| core::TimeUnit unit; |
| if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) |
| && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { |
| logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod); |
| processor->setSchedulingPeriodNano(schedulingPeriod); |
| } |
| |
| if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) |
| && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) { |
| logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod); |
| processor->setPenalizationPeriodMsec(penalizationPeriod); |
| } |
| |
| if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) |
| && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) { |
| logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod); |
| processor->setYieldPeriodMsec(yieldPeriod); |
| } |
| |
| // Default to running |
| processor->setScheduledState(core::RUNNING); |
| |
| if (procCfg.schedulingStrategy == "TIMER_DRIVEN") { |
| processor->setSchedulingStrategy(core::TIMER_DRIVEN); |
| logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); |
| } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") { |
| processor->setSchedulingStrategy(core::EVENT_DRIVEN); |
| logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); |
| } else { |
| processor->setSchedulingStrategy(core::CRON_DRIVEN); |
| logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); |
| } |
| |
| int64_t maxConcurrentTasks; |
| if (core::Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) { |
| logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); |
| processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks); |
| } |
| |
| if (core::Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) { |
| logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos); |
| processor->setRunDurationNano((uint64_t) runDurationNanos); |
| } |
| |
| std::set<core::Relationship> autoTerminatedRelationships; |
| for (auto &&relString : procCfg.autoTerminatedRelationships) { |
| core::Relationship relationship(relString, ""); |
| logger_->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString); |
| autoTerminatedRelationships.insert(relationship); |
| } |
| |
| processor->setAutoTerminatedRelationships(autoTerminatedRelationships); |
| |
| parentGroup->addProcessor(processor); |
| } |
| } |
| } else { |
| throw new std::invalid_argument( |
| "Cannot instantiate a MiNiFi instance without a defined " |
| "Processors configuration node."); |
| } |
| } |
| |
| void YamlConfiguration::parseRemoteProcessGroupYaml( |
| YAML::Node *rpgNode, core::ProcessGroup * parentGroup) { |
| uuid_t uuid; |
| std::string id; |
| |
| if (!parentGroup) { |
| logger_->log_error("parseRemoteProcessGroupYaml: no parent group exists"); |
| return; |
| } |
| |
| if (rpgNode) { |
| if (rpgNode->IsSequence()) { |
| for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); |
| ++iter) { |
| YAML::Node currRpgNode = iter->as<YAML::Node>(); |
| |
| checkRequiredField(&currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); |
| auto name = currRpgNode["name"].as<std::string>(); |
| id = getOrGenerateId(&currRpgNode); |
| |
| logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id); |
| |
| checkRequiredField(&currRpgNode, "url", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); |
| std::string url = currRpgNode["url"].as<std::string>(); |
| logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url); |
| |
| core::ProcessGroup *group = NULL; |
| core::TimeUnit unit; |
| int64_t timeoutValue = -1; |
| int64_t yieldPeriodValue = -1; |
| uuid_parse(id.c_str(), uuid); |
| group = this->createRemoteProcessGroup(name.c_str(), uuid).release(); |
| group->setParent(parentGroup); |
| parentGroup->addProcessGroup(group); |
| |
| if (currRpgNode["yield period"]) { |
| std::string yieldPeriod = currRpgNode["yield period"].as<std::string>(); |
| logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod); |
| |
| if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) |
| && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, |
| yieldPeriodValue) && group) { |
| logger_->log_debug( |
| "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", |
| yieldPeriodValue); |
| group->setYieldPeriodMsec(yieldPeriodValue); |
| } |
| } |
| |
| if (currRpgNode["timeout"]) { |
| std::string timeout = currRpgNode["timeout"].as<std::string>(); |
| logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout); |
| |
| if (core::Property::StringToTime(timeout, timeoutValue, unit) |
| && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, |
| timeoutValue) && group) { |
| logger_->log_debug( |
| "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", |
| timeoutValue); |
| group->setTimeOut(timeoutValue); |
| } |
| } |
| |
| group->setTransmitting(true); |
| group->setURL(url); |
| |
| checkRequiredField(&currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); |
| YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>(); |
| if (inputPorts && inputPorts.IsSequence()) { |
| for (YAML::const_iterator portIter = inputPorts.begin(); |
| portIter != inputPorts.end(); ++portIter) { |
| logger_->log_debug("Got a current port, iterating..."); |
| |
| YAML::Node currPort = portIter->as<YAML::Node>(); |
| |
| this->parsePortYaml(&currPort, group, SEND); |
| } // for node |
| } |
| YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>(); |
| if (outputPorts && outputPorts.IsSequence()) { |
| for (YAML::const_iterator portIter = outputPorts.begin(); |
| portIter != outputPorts.end(); ++portIter) { |
| logger_->log_debug("Got a current port, iterating..."); |
| |
| YAML::Node currPort = portIter->as<YAML::Node>(); |
| |
| this->parsePortYaml(&currPort, group, RECEIVE); |
| } // for node |
| } |
| } |
| } |
| } |
| } |
| |
| void YamlConfiguration::parseProvenanceReportingYaml( |
| YAML::Node *reportNode, core::ProcessGroup * parentGroup) { |
| uuid_t port_uuid; |
| int64_t schedulingPeriod = -1; |
| |
| if (!parentGroup) { |
| logger_->log_error("parseProvenanceReportingYaml: no parent group exists"); |
| return; |
| } |
| |
| if (!reportNode || !reportNode->IsDefined() || reportNode->IsNull()) { |
| logger_->log_debug("no provenance reporting task specified"); |
| return; |
| } |
| |
| std::shared_ptr<core::Processor> processor = nullptr; |
| processor = createProvenanceReportTask(); |
| std::shared_ptr<core::reporting::SiteToSiteProvenanceReportingTask> reportTask = |
| std::static_pointer_cast < core::reporting::SiteToSiteProvenanceReportingTask |
| > (processor); |
| |
| YAML::Node node = reportNode->as<YAML::Node>(); |
| |
| checkRequiredField(&node, "scheduling strategy", CONFIG_YAML_PROVENANCE_REPORT_KEY); |
| auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>(); |
| checkRequiredField(&node, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY); |
| auto schedulingPeriodStr = node["scheduling period"].as<std::string>(); |
| checkRequiredField(&node, "host", CONFIG_YAML_PROVENANCE_REPORT_KEY); |
| auto hostStr = node["host"].as<std::string>(); |
| checkRequiredField(&node, "port", CONFIG_YAML_PROVENANCE_REPORT_KEY); |
| auto portStr = node["port"].as<std::string>(); |
| checkRequiredField(&node, "port uuid", CONFIG_YAML_PROVENANCE_REPORT_KEY); |
| auto portUUIDStr = node["port uuid"].as<std::string>(); |
| checkRequiredField(&node, "batch size", CONFIG_YAML_PROVENANCE_REPORT_KEY); |
| auto batchSizeStr = node["batch size"].as<std::string>(); |
| |
| // add processor to parent |
| parentGroup->addProcessor(processor); |
| processor->setScheduledState(core::RUNNING); |
| |
| core::TimeUnit unit; |
| if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit) && |
| core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { |
| logger_->log_debug( |
| "ProvenanceReportingTask schedulingPeriod %d ns", |
| schedulingPeriod); |
| processor->setSchedulingPeriodNano(schedulingPeriod); |
| } |
| |
| if (schedulingStrategyStr == "TIMER_DRIVEN") { |
| processor->setSchedulingStrategy(core::TIMER_DRIVEN); |
| logger_->log_debug( |
| "ProvenanceReportingTask scheduling strategy %s", schedulingStrategyStr); |
| } else { |
| throw std::invalid_argument( |
| "Invalid scheduling strategy " + schedulingStrategyStr); |
| } |
| |
| reportTask->setHost(hostStr); |
| logger_->log_debug("ProvenanceReportingTask host %s", hostStr); |
| int64_t lvalue; |
| if (core::Property::StringToInt(portStr, lvalue)) { |
| logger_->log_debug("ProvenanceReportingTask port %d", (uint16_t) lvalue); |
| reportTask->setPort((uint16_t) lvalue); |
| } |
| logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr); |
| uuid_parse(portUUIDStr.c_str(), port_uuid); |
| reportTask->setPortUUID(port_uuid); |
| if (core::Property::StringToInt(batchSizeStr, lvalue)) { |
| reportTask->setBatchSize(lvalue); |
| } |
| } |
| |
| void YamlConfiguration::parseConnectionYaml( |
| YAML::Node *connectionsNode, |
| core::ProcessGroup *parent) { |
| if (!parent) { |
| logger_->log_error("parseProcessNode: no parent group was provided"); |
| return; |
| } |
| |
| if (connectionsNode) { |
| if (connectionsNode->IsSequence()) { |
| for (YAML::const_iterator iter = connectionsNode->begin(); |
| iter != connectionsNode->end(); ++iter) { |
| YAML::Node connectionNode = iter->as<YAML::Node>(); |
| std::shared_ptr<minifi::Connection> connection = nullptr; |
| |
| // Configure basic connection |
| uuid_t uuid; |
| checkRequiredField(&connectionNode, "name", CONFIG_YAML_CONNECTIONS_KEY); |
| std::string name = connectionNode["name"].as<std::string>(); |
| std::string id = getOrGenerateId(&connectionNode); |
| uuid_parse(id.c_str(), uuid); |
| connection = this->createConnection(name, uuid); |
| logger_->log_debug( |
| "Created connection with UUID %s and name %s", id, name); |
| |
| |
| // Configure connection source |
| checkRequiredField(&connectionNode, "source relationship name", CONFIG_YAML_CONNECTIONS_KEY); |
| auto rawRelationship = connectionNode["source relationship name"].as<std::string>(); |
| core::Relationship relationship(rawRelationship, ""); |
| logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship); |
| if (connection) { |
| connection->setRelationship(relationship); |
| } |
| |
| uuid_t srcUUID; |
| |
| if (connectionNode["source id"]) { |
| std::string connectionSrcProcId = connectionNode["source id"].as<std::string>(); |
| uuid_parse(connectionSrcProcId.c_str(), srcUUID); |
| logger_->log_debug("Using 'source id' to match source with same id for " |
| "connection '%s': source id => [%s]", name, connectionSrcProcId); |
| } else { |
| // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary |
| checkRequiredField(&connectionNode, "source name", CONFIG_YAML_CONNECTIONS_KEY); |
| std::string connectionSrcProcName = connectionNode["source name"].as<std::string>(); |
| uuid_t tmpUUID; |
| if (!uuid_parse(connectionSrcProcName.c_str(), tmpUUID) && NULL != parent->findProcessor(tmpUUID)) { |
| // the source name is a remote port id, so use that as the source id |
| uuid_copy(srcUUID, tmpUUID); |
| logger_->log_debug("Using 'source name' containing a remote port id to match the source for " |
| "connection '%s': source name => [%s]", name, connectionSrcProcName); |
| } else { |
| // lastly, look the processor up by name |
| auto srcProcessor = parent->findProcessor(connectionSrcProcName); |
| if (NULL != srcProcessor) { |
| srcProcessor->getUUID(srcUUID); |
| logger_->log_debug("Using 'source name' to match source with same name for " |
| "connection '%s': source name => [%s]", name, connectionSrcProcName); |
| } else { |
| // we ran out of ways to discover the source processor |
| logger_->log_error( |
| "Could not locate a source with name %s to create a connection", connectionSrcProcName); |
| throw std::invalid_argument( |
| "Could not locate a source with name " + connectionSrcProcName + " to create a connection "); |
| } |
| } |
| } |
| connection->setSourceUUID(srcUUID); |
| |
| // Configure connection destination |
| uuid_t destUUID; |
| if (connectionNode["destination id"]) { |
| std::string connectionDestProcId = connectionNode["destination id"].as<std::string>(); |
| uuid_parse(connectionDestProcId.c_str(), destUUID); |
| logger_->log_debug("Using 'destination id' to match destination with same id for " |
| "connection '%s': destination id => [%s]", name, connectionDestProcId); |
| } else { |
| // we use the same logic as above for resolving the source processor |
| // for looking up the destination processor in absence of a processor id |
| checkRequiredField(&connectionNode, "destination name", CONFIG_YAML_CONNECTIONS_KEY); |
| std::string connectionDestProcName = connectionNode["destination name"].as<std::string>(); |
| uuid_t tmpUUID; |
| if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) && |
| NULL != parent->findProcessor(tmpUUID)) { |
| // the destination name is a remote port id, so use that as the dest id |
| uuid_copy(destUUID, tmpUUID); |
| logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for " |
| "connection '%s': destination name => [%s]", name, connectionDestProcName); |
| } else { |
| // look the processor up by name |
| auto destProcessor = parent->findProcessor(connectionDestProcName); |
| if (NULL != destProcessor) { |
| destProcessor->getUUID(destUUID); |
| logger_->log_debug("Using 'destination name' to match destination with same name for " |
| "connection '%s': destination name => [%s]", name, connectionDestProcName); |
| } else { |
| // we ran out of ways to discover the destination processor |
| logger_->log_error( |
| "Could not locate a destination with name %s to create a connection", connectionDestProcName); |
| throw std::invalid_argument( |
| "Could not locate a destination with name " + connectionDestProcName + " to create a connection"); |
| } |
| } |
| } |
| connection->setDestinationUUID(destUUID); |
| |
| if (connection) { |
| parent->addConnection(connection); |
| } |
| } |
| } |
| } |
| } |
| |
| void YamlConfiguration::parsePortYaml(YAML::Node *portNode, |
| core::ProcessGroup *parent, |
| TransferDirection direction) { |
| uuid_t uuid; |
| std::shared_ptr<core::Processor> processor = NULL; |
| std::shared_ptr<minifi::RemoteProcessorGroupPort> port = NULL; |
| |
| if (!parent) { |
| logger_->log_error("parseProcessNode: no parent group existed"); |
| return; |
| } |
| |
| YAML::Node inputPortsObj = portNode->as<YAML::Node>(); |
| |
| // Check for required fields |
| checkRequiredField(&inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); |
| auto nameStr = inputPortsObj["name"].as<std::string>(); |
| checkRequiredField(&inputPortsObj, "id", "The field 'id' is required for " |
| "the port named '" + nameStr + "' in the YAML Config. If this port " |
| "is an input port for a NiFi Remote Process Group, the port " |
| "id should match the corresponding id specified in the NiFi configuration. " |
| "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX."); |
| auto portId = inputPortsObj["id"].as<std::string>(); |
| uuid_parse(portId.c_str(), uuid); |
| |
| port = std::make_shared<minifi::RemoteProcessorGroupPort>(nameStr, uuid); |
| |
| processor = std::static_pointer_cast<core::Processor>(port); |
| port->setDirection(direction); |
| port->setTimeOut(parent->getTimeOut()); |
| port->setTransmitting(true); |
| processor->setYieldPeriodMsec(parent->getYieldPeriodMsec()); |
| processor->initialize(); |
| |
| // handle port properties |
| YAML::Node nodeVal = portNode->as<YAML::Node>(); |
| YAML::Node propertiesNode = nodeVal["Properties"]; |
| parsePropertiesNodeYaml(&propertiesNode, processor); |
| |
| // add processor to parent |
| parent->addProcessor(processor); |
| processor->setScheduledState(core::RUNNING); |
| |
| if (inputPortsObj["max concurrent tasks"]) { |
| auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>(); |
| int64_t maxConcurrentTasks; |
| if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { |
| processor->setMaxConcurrentTasks(maxConcurrentTasks); |
| } |
| logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); |
| processor->setMaxConcurrentTasks(maxConcurrentTasks); |
| } |
| } |
| |
| void YamlConfiguration::parsePropertiesNodeYaml( |
| YAML::Node *propertiesNode, std::shared_ptr<core::Processor> processor) { |
| // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated |
| for (YAML::const_iterator propsIter = propertiesNode->begin(); |
| propsIter != propertiesNode->end(); ++propsIter) { |
| std::string propertyName = propsIter->first.as<std::string>(); |
| YAML::Node propertyValueNode = propsIter->second; |
| if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) { |
| std::string rawValueString = propertyValueNode.as<std::string>(); |
| if (!processor->setProperty(propertyName, rawValueString)) { |
| logger_->log_warn( |
| "Received property %s with value %s but it is not one of the properties for %s", |
| propertyName, |
| rawValueString, |
| processor->getName()); |
| } |
| } |
| } |
| } |
| |
| std::string YamlConfiguration::getOrGenerateId( |
| YAML::Node *yamlNode, |
| const std::string &idField) { |
| std::string id; |
| YAML::Node node = yamlNode->as<YAML::Node>(); |
| |
| if (node[idField]) { |
| if (YAML::NodeType::Scalar == node[idField].Type()) { |
| id = node[idField].as<std::string>(); |
| } else { |
| throw std::invalid_argument( |
| "getOrGenerateId: idField is expected to reference YAML::Node " |
| "of YAML::NodeType::Scalar."); |
| } |
| } else { |
| uuid_t uuid; |
| uuid_generate(uuid); |
| char uuid_str[37]; |
| uuid_unparse(uuid, uuid_str); |
| id = uuid_str; |
| logger_->log_debug("Generating random ID: id => [%s]", id); |
| } |
| return id; |
| } |
| |
| void YamlConfiguration::checkRequiredField( |
| YAML::Node *yamlNode, |
| const std::string &fieldName, |
| const std::string &yamlSection, |
| const std::string &errorMessage) { |
| |
| std::string errMsg = errorMessage; |
| if (!yamlNode->as<YAML::Node>()[fieldName]) { |
| if (errMsg.empty()) { |
| // Build a helpful error message for the user so they can fix the |
| // invalid YAML config file, using the component name if present |
| errMsg = |
| yamlNode->as<YAML::Node>()["name"] ? |
| "Unable to parse configuration file for component named '" + |
| yamlNode->as<YAML::Node>()["name"].as<std::string>() + |
| "' as required field '" + fieldName + "' is missing" : |
| "Unable to parse configuration file as required field '" + |
| fieldName + "' is missing"; |
| if (!yamlSection.empty()) { |
| errMsg += " [in '" + yamlSection + |
| "' section of configuration file]"; |
| } |
| } |
| logger_->log_error(errMsg.c_str()); |
| throw std::invalid_argument(errMsg); |
| } |
| } |
| |
| } /* namespace core */ |
| } /* namespace minifi */ |
| } /* namespace nifi */ |
| } /* namespace apache */ |
| } /* namespace org */ |