| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <memory> |
| #include <vector> |
| #include <set> |
| #include <cinttypes> |
| |
| #include "core/yaml/YamlConfiguration.h" |
| #include "core/yaml/CheckRequiredField.h" |
| #include "core/yaml/YamlConnectionParser.h" |
| #include "core/state/Value.h" |
| #include "Defaults.h" |
| #include "utils/TimeUtil.h" |
| #include "Funnel.h" |
| |
| #ifdef YAML_CONFIGURATION_USE_REGEX |
| #include "utils/RegexUtils.h" |
| #endif // YAML_CONFIGURATION_USE_REGEX |
| |
| namespace org::apache::nifi::minifi::core { |
| |
| std::shared_ptr<utils::IdGenerator> YamlConfiguration::id_generator_ = utils::IdGenerator::getIdGenerator(); |
| |
| YamlConfiguration::YamlConfiguration(const std::shared_ptr<core::Repository>& repo, const std::shared_ptr<core::Repository>& flow_file_repo, |
| const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<io::StreamFactory>& stream_factory, |
| const std::shared_ptr<Configure>& configuration, const std::optional<std::string>& path, |
| const std::shared_ptr<utils::file::FileSystem>& filesystem) |
| : FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configuration, |
| path.value_or(DEFAULT_NIFI_CONFIG_YML), filesystem), |
| stream_factory_(stream_factory), |
| logger_(logging::LoggerFactory<YamlConfiguration>::getLogger()) {} |
| |
| std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseRootProcessGroupYaml(const YAML::Node& rootFlowNode) { |
| auto flowControllerNode = rootFlowNode[CONFIG_YAML_FLOW_CONTROLLER_KEY]; |
| auto rootGroup = parseProcessGroupYaml(flowControllerNode, rootFlowNode, true); |
| this->name_ = rootGroup->getName(); |
| return rootGroup; |
| } |
| |
| std::unique_ptr<core::ProcessGroup> YamlConfiguration::createProcessGroup(const YAML::Node& yamlNode, bool is_root) { |
| int version = 0; |
| |
| yaml::checkRequiredField(yamlNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); |
| auto flowName = yamlNode["name"].as<std::string>(); |
| |
| utils::Identifier uuid; |
| // assignment throws on invalid uuid |
| uuid = getOrGenerateId(yamlNode); |
| |
| if (yamlNode["version"]) { |
| version = yamlNode["version"].as<int>(); |
| } |
| |
| logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", uuid.to_string(), flowName); |
| std::unique_ptr<core::ProcessGroup> group; |
| if (is_root) { |
| group = FlowConfiguration::createRootProcessGroup(flowName, uuid, version); |
| } else { |
| group = FlowConfiguration::createSimpleProcessGroup(flowName, uuid, version); |
| } |
| |
| if (yamlNode["onschedule retry interval"]) { |
| auto onScheduleRetryPeriod = yamlNode["onschedule retry interval"].as<std::string>(); |
| logger_->log_debug("parseRootProcessGroup: onschedule retry period => [%s]", onScheduleRetryPeriod); |
| |
| auto on_schedule_retry_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(onScheduleRetryPeriod); |
| if (on_schedule_retry_period_value.has_value() && group) { |
| logger_->log_debug("parseRootProcessGroup: onschedule retry => [%" PRId64 "] ms", on_schedule_retry_period_value->count()); |
| group->setOnScheduleRetryPeriod(on_schedule_retry_period_value->count()); |
| } |
| } |
| |
| return group; |
| } |
| |
| std::unique_ptr<core::ProcessGroup> YamlConfiguration::parseProcessGroupYaml(const YAML::Node& headerNode, const YAML::Node& yamlNode, bool is_root) { |
| auto group = createProcessGroup(headerNode, is_root); |
| YAML::Node processorsNode = yamlNode[CONFIG_YAML_PROCESSORS_KEY]; |
| YAML::Node connectionsNode = yamlNode[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY]; |
| YAML::Node funnelsNode = yamlNode[CONFIG_YAML_FUNNELS_KEY]; |
| YAML::Node inputPortsNode = yamlNode[CONFIG_YAML_INPUT_PORTS_KEY]; |
| YAML::Node outputPortsNode = yamlNode[CONFIG_YAML_OUTPUT_PORTS_KEY]; |
| YAML::Node remoteProcessingGroupsNode = [&] { |
| // assignment is not supported on invalid Yaml nodes |
| YAML::Node candidate = yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY]; |
| if (candidate) { |
| return candidate; |
| } |
| return yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3]; |
| }(); |
| YAML::Node childProcessGroupNodeSeq = yamlNode["Process Groups"]; |
| |
| parseProcessorNodeYaml(processorsNode, group.get()); |
| parseRemoteProcessGroupYaml(remoteProcessingGroupsNode, group.get()); |
| parseFunnelsYaml(funnelsNode, group.get()); |
| parsePorts(inputPortsNode, group.get(), PortType::INPUT); |
| parsePorts(outputPortsNode, group.get(), PortType::OUTPUT); |
| |
| if (childProcessGroupNodeSeq && childProcessGroupNodeSeq.IsSequence()) { |
| for (YAML::const_iterator it = childProcessGroupNodeSeq.begin(); it != childProcessGroupNodeSeq.end(); ++it) { |
| auto childProcessGroupNode = it->as<YAML::Node>(); |
| group->addProcessGroup(parseProcessGroupYaml(childProcessGroupNode, childProcessGroupNode)); |
| } |
| } |
| |
| // parse connections last to give feedback if the source and/or destination processors |
| // is not in the same process group or input/output port connections are not allowed |
| parseConnectionYaml(connectionsNode, group.get()); |
| return group; |
| } |
| |
| std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(const YAML::Node& rootYamlNode) { |
| uuids_.clear(); |
| YAML::Node controllerServiceNode = rootYamlNode[CONFIG_YAML_CONTROLLER_SERVICES_KEY]; |
| YAML::Node provenanceReportNode = rootYamlNode[CONFIG_YAML_PROVENANCE_REPORT_KEY]; |
| |
| parseControllerServices(controllerServiceNode); |
| // Create the root process group |
| std::unique_ptr<core::ProcessGroup> root = parseRootProcessGroupYaml(rootYamlNode); |
| parseProvenanceReportingYaml(provenanceReportNode, root.get()); |
| |
| // set the controller services into the root group. |
| for (const auto& controller_service : controller_services_->getAllControllerServices()) { |
| root->addControllerService(controller_service->getName(), controller_service); |
| root->addControllerService(controller_service->getUUIDStr(), controller_service); |
| } |
| |
| root->verify(); |
| |
| return root; |
| } |
| |
| void YamlConfiguration::parseProcessorNodeYaml(const YAML::Node& processorsNode, core::ProcessGroup* parentGroup) { |
| int64_t runDurationNanos = -1; |
| utils::Identifier uuid; |
| std::unique_ptr<core::Processor> processor; |
| |
| if (!parentGroup) { |
| logger_->log_error("parseProcessNodeYaml: no parent group exists"); |
| return; |
| } |
| |
| if (!processorsNode) { |
| throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); |
| } |
| if (!processorsNode.IsSequence()) { |
| throw std::invalid_argument( |
| "Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); |
| } |
| // Evaluate sequence of processors |
| for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) { |
| core::ProcessorConfig procCfg; |
| const auto procNode = iter->as<YAML::Node>(); |
| |
| yaml::checkRequiredField(procNode, "name", CONFIG_YAML_PROCESSORS_KEY); |
| procCfg.name = procNode["name"].as<std::string>(); |
| procCfg.id = getOrGenerateId(procNode); |
| |
| uuid = procCfg.id; |
| logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id); |
| yaml::checkRequiredField(procNode, "class", CONFIG_YAML_PROCESSORS_KEY); |
| procCfg.javaClass = procNode["class"].as<std::string>(); |
| logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass); |
| |
| // Determine the processor name only from the Java class |
| auto lastOfIdx = procCfg.javaClass.find_last_of('.'); |
| if (lastOfIdx != std::string::npos) { |
| lastOfIdx++; // if a value is found, increment to move beyond the . |
| std::string processorName = procCfg.javaClass.substr(lastOfIdx); |
| processor = this->createProcessor(processorName, procCfg.javaClass, uuid); |
| } else { |
| // Allow unqualified class names for core processors |
| processor = this->createProcessor(procCfg.javaClass, uuid); |
| } |
| |
| if (!processor) { |
| logger_->log_error("Could not create a processor %s with id %s", procCfg.name, procCfg.id); |
| throw std::invalid_argument("Could not create processor " + procCfg.name); |
| } |
| |
| processor->setName(procCfg.name); |
| |
| processor->setFlowIdentifier(flow_version_->getFlowIdentifier()); |
| |
| auto strategyNode = getOptionalField(procNode, "scheduling strategy", YAML::Node(DEFAULT_SCHEDULING_STRATEGY), |
| CONFIG_YAML_PROCESSORS_KEY); |
| procCfg.schedulingStrategy = strategyNode.as<std::string>(); |
| logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy); |
| |
| auto periodNode = getOptionalField(procNode, "scheduling period", YAML::Node(DEFAULT_SCHEDULING_PERIOD_STR), |
| CONFIG_YAML_PROCESSORS_KEY); |
| |
| procCfg.schedulingPeriod = periodNode.as<std::string>(); |
| logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod); |
| |
| if (procNode["max concurrent tasks"]) { |
| procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>(); |
| logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks); |
| } |
| |
| if (procNode["penalization period"]) { |
| procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>(); |
| logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod); |
| } |
| |
| if (procNode["yield period"]) { |
| procCfg.yieldPeriod = procNode["yield period"].as<std::string>(); |
| logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod); |
| } |
| |
| if (procNode["run duration nanos"]) { |
| procCfg.runDurationNanos = procNode["run duration nanos"].as<std::string>(); |
| logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos); |
| } |
| |
| // handle auto-terminated relationships |
| if (procNode["auto-terminated relationships list"]) { |
| YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"]; |
| std::vector<std::string> rawAutoTerminatedRelationshipValues; |
| if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() && autoTerminatedSequence.size() > 0) { |
| for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); relIter != autoTerminatedSequence.end(); ++relIter) { |
| auto autoTerminatedRel = relIter->as<std::string>(); |
| rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel); |
| } |
| } |
| procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues; |
| } |
| |
| // handle processor properties |
| if (procNode["Properties"]) { |
| YAML::Node propertiesNode = procNode["Properties"]; |
| parsePropertiesNodeYaml(propertiesNode, *processor, procCfg.name, CONFIG_YAML_PROCESSORS_KEY); |
| } |
| |
| // Take care of scheduling |
| |
| if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") { |
| if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(procCfg.schedulingPeriod)) { |
| logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%" PRId64 "] ns", scheduling_period->count()); |
| processor->setSchedulingPeriodNano(*scheduling_period); |
| } |
| } else { |
| processor->setCronPeriod(procCfg.schedulingPeriod); |
| } |
| |
| if (auto penalization_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.penalizationPeriod)) { |
| logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%" PRId64 "] ms", penalization_period->count()); |
| processor->setPenalizationPeriod(penalization_period.value()); |
| } |
| |
| if (auto yield_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.yieldPeriod)) { |
| logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%" PRId64 "] ms", yield_period->count()); |
| processor->setYieldPeriodMsec(yield_period.value()); |
| } |
| |
| // Default to running |
| processor->setScheduledState(core::RUNNING); |
| |
| if (procCfg.schedulingStrategy == "TIMER_DRIVEN") { |
| processor->setSchedulingStrategy(core::TIMER_DRIVEN); |
| logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); |
| } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") { |
| processor->setSchedulingStrategy(core::EVENT_DRIVEN); |
| logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); |
| } else { |
| processor->setSchedulingStrategy(core::CRON_DRIVEN); |
| logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); |
| } |
| |
| int32_t maxConcurrentTasks; |
| if (core::Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) { |
| logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); |
| processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks); |
| } |
| |
| if (core::Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) { |
| logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos); |
| processor->setRunDurationNano(std::chrono::nanoseconds(runDurationNanos)); |
| } |
| |
| std::vector<core::Relationship> autoTerminatedRelationships; |
| for (auto &&relString : procCfg.autoTerminatedRelationships) { |
| core::Relationship relationship(relString, ""); |
| logger_->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString); |
| autoTerminatedRelationships.push_back(relationship); |
| } |
| |
| processor->setAutoTerminatedRelationships(autoTerminatedRelationships); |
| |
| parentGroup->addProcessor(std::move(processor)); |
| } |
| } |
| |
| void YamlConfiguration::parseRemoteProcessGroupYaml(const YAML::Node& rpgNode, core::ProcessGroup* parentGroup) { |
| utils::Identifier uuid; |
| std::string id; |
| |
| if (!parentGroup) { |
| logger_->log_error("parseRemoteProcessGroupYaml: no parent group exists"); |
| return; |
| } |
| |
| if (!rpgNode || !rpgNode.IsSequence()) { |
| return; |
| } |
| for (YAML::const_iterator iter = rpgNode.begin(); iter != rpgNode.end(); ++iter) { |
| auto currRpgNode = iter->as<YAML::Node>(); |
| |
| yaml::checkRequiredField(currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); |
| auto name = currRpgNode["name"].as<std::string>(); |
| id = getOrGenerateId(currRpgNode); |
| |
| logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id); |
| |
| auto urlNode = getOptionalField(currRpgNode, "url", YAML::Node(""), |
| CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); |
| |
| auto url = urlNode.as<std::string>(); |
| logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url); |
| |
| uuid = id; |
| auto group = this->createRemoteProcessGroup(name, uuid); |
| group->setParent(parentGroup); |
| |
| if (currRpgNode["yield period"]) { |
| auto yieldPeriod = currRpgNode["yield period"].as<std::string>(); |
| logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod); |
| |
| auto yield_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(yieldPeriod); |
| if (yield_period_value.has_value() && group) { |
| logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%" PRId64 "] ms", yield_period_value->count()); |
| group->setYieldPeriodMsec(*yield_period_value); |
| } |
| } |
| |
| if (currRpgNode["timeout"]) { |
| auto timeout = currRpgNode["timeout"].as<std::string>(); |
| logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout); |
| |
| auto timeout_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeout); |
| if (timeout_value.has_value() && group) { |
| logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%" PRId64 "] ms", timeout_value->count()); |
| group->setTimeout(timeout_value->count()); |
| } |
| } |
| |
| if (currRpgNode["local network interface"]) { |
| auto interface = currRpgNode["local network interface"].as<std::string>(); |
| logger_->log_debug("parseRemoteProcessGroupYaml: local network interface => [%s]", interface); |
| group->setInterface(interface); |
| } |
| |
| if (currRpgNode["transport protocol"]) { |
| auto transport_protocol = currRpgNode["transport protocol"].as<std::string>(); |
| logger_->log_debug("parseRemoteProcessGroupYaml: transport protocol => [%s]", transport_protocol); |
| if (transport_protocol == "HTTP") { |
| group->setTransportProtocol(transport_protocol); |
| if (currRpgNode["proxy host"]) { |
| auto http_proxy_host = currRpgNode["proxy host"].as<std::string>(); |
| logger_->log_debug("parseRemoteProcessGroupYaml: proxy host => [%s]", http_proxy_host); |
| group->setHttpProxyHost(http_proxy_host); |
| if (currRpgNode["proxy user"]) { |
| auto http_proxy_username = currRpgNode["proxy user"].as<std::string>(); |
| logger_->log_debug("parseRemoteProcessGroupYaml: proxy user => [%s]", http_proxy_username); |
| group->setHttpProxyUserName(http_proxy_username); |
| } |
| if (currRpgNode["proxy password"]) { |
| auto http_proxy_password = currRpgNode["proxy password"].as<std::string>(); |
| logger_->log_debug("parseRemoteProcessGroupYaml: proxy password => [%s]", http_proxy_password); |
| group->setHttpProxyPassWord(http_proxy_password); |
| } |
| if (currRpgNode["proxy port"]) { |
| auto http_proxy_port = currRpgNode["proxy port"].as<std::string>(); |
| int32_t port; |
| if (core::Property::StringToInt(http_proxy_port, port)) { |
| logger_->log_debug("parseRemoteProcessGroupYaml: proxy port => [%d]", port); |
| group->setHttpProxyPort(port); |
| } |
| } |
| } |
| } else if (transport_protocol == "RAW") { |
| group->setTransportProtocol(transport_protocol); |
| } else { |
| std::stringstream stream; |
| stream << "Invalid transport protocol " << transport_protocol; |
| throw minifi::Exception(ExceptionType::SITE2SITE_EXCEPTION, stream.str().c_str()); |
| } |
| } |
| |
| group->setTransmitting(true); |
| group->setURL(url); |
| |
| yaml::checkRequiredField(currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); |
| auto inputPorts = currRpgNode["Input Ports"].as<YAML::Node>(); |
| if (inputPorts && inputPorts.IsSequence()) { |
| for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) { |
| auto currPort = portIter->as<YAML::Node>(); |
| |
| this->parsePortYaml(currPort, group.get(), sitetosite::SEND); |
| } // for node |
| } |
| auto outputPorts = currRpgNode["Output Ports"].as<YAML::Node>(); |
| if (outputPorts && outputPorts.IsSequence()) { |
| for (YAML::const_iterator portIter = outputPorts.begin(); portIter != outputPorts.end(); ++portIter) { |
| logger_->log_debug("Got a current port, iterating..."); |
| |
| auto currPort = portIter->as<YAML::Node>(); |
| |
| this->parsePortYaml(currPort, group.get(), sitetosite::RECEIVE); |
| } // for node |
| } |
| parentGroup->addProcessGroup(std::move(group)); |
| } |
| } |
| |
| void YamlConfiguration::parseProvenanceReportingYaml(const YAML::Node& reportNode, core::ProcessGroup* parentGroup) { |
| utils::Identifier port_uuid; |
| |
| if (!parentGroup) { |
| logger_->log_error("parseProvenanceReportingYaml: no parent group exists"); |
| return; |
| } |
| |
| if (!reportNode || !reportNode.IsDefined() || reportNode.IsNull()) { |
| logger_->log_debug("no provenance reporting task specified"); |
| return; |
| } |
| |
| auto reportTask = createProvenanceReportTask(); |
| |
| const auto node = reportNode.as<YAML::Node>(); |
| |
| yaml::checkRequiredField(node, "scheduling strategy", CONFIG_YAML_PROVENANCE_REPORT_KEY); |
| auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>(); |
| yaml::checkRequiredField(node, "scheduling period", CONFIG_YAML_PROVENANCE_REPORT_KEY); |
| auto schedulingPeriodStr = node["scheduling period"].as<std::string>(); |
| |
| if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(schedulingPeriodStr)) { |
| logger_->log_debug("ProvenanceReportingTask schedulingPeriod %" PRId64 " ns", scheduling_period->count()); |
| reportTask->setSchedulingPeriodNano(*scheduling_period); |
| } |
| |
| if (schedulingStrategyStr == "TIMER_DRIVEN") { |
| reportTask->setSchedulingStrategy(core::TIMER_DRIVEN); |
| logger_->log_debug("ProvenanceReportingTask scheduling strategy %s", schedulingStrategyStr); |
| } else { |
| throw std::invalid_argument("Invalid scheduling strategy " + schedulingStrategyStr); |
| } |
| |
| int64_t lvalue; |
| if (node["host"] && node["port"]) { |
| auto hostStr = node["host"].as<std::string>(); |
| |
| auto portStr = node["port"].as<std::string>(); |
| if (core::Property::StringToInt(portStr, lvalue) && !hostStr.empty()) { |
| logger_->log_debug("ProvenanceReportingTask port %" PRId64, lvalue); |
| std::string url = hostStr + ":" + portStr; |
| reportTask->setURL(url); |
| } |
| } |
| |
| if (node["url"]) { |
| auto urlStr = node["url"].as<std::string>(); |
| if (!urlStr.empty()) { |
| reportTask->setURL(urlStr); |
| logger_->log_debug("ProvenanceReportingTask URL %s", urlStr); |
| } |
| } |
| yaml::checkRequiredField(node, "port uuid", CONFIG_YAML_PROVENANCE_REPORT_KEY); |
| auto portUUIDStr = node["port uuid"].as<std::string>(); |
| yaml::checkRequiredField(node, "batch size", CONFIG_YAML_PROVENANCE_REPORT_KEY); |
| auto batchSizeStr = node["batch size"].as<std::string>(); |
| |
| logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr); |
| port_uuid = portUUIDStr; |
| reportTask->setPortUUID(port_uuid); |
| |
| if (core::Property::StringToInt(batchSizeStr, lvalue)) { |
| reportTask->setBatchSize(gsl::narrow<int>(lvalue)); |
| } |
| |
| reportTask->initialize(); |
| |
| // add processor to parent |
| reportTask->setScheduledState(core::RUNNING); |
| parentGroup->addProcessor(std::move(reportTask)); |
| } |
| |
| void YamlConfiguration::parseControllerServices(const YAML::Node& controllerServicesNode) { |
| if (!controllerServicesNode || !controllerServicesNode.IsSequence()) { |
| return; |
| } |
| for (const auto& iter : controllerServicesNode) { |
| const auto controllerServiceNode = iter.as<YAML::Node>(); |
| try { |
| yaml::checkRequiredField(controllerServiceNode, "name", CONFIG_YAML_CONTROLLER_SERVICES_KEY); |
| |
| auto type = yaml::getRequiredField(controllerServiceNode, std::vector<std::string>{"class", "type"}, CONFIG_YAML_CONTROLLER_SERVICES_KEY); |
| logger_->log_debug("Using type %s for controller service node", type); |
| |
| std::string fullType = type; |
| auto lastOfIdx = type.find_last_of('.'); |
| if (lastOfIdx != std::string::npos) { |
| lastOfIdx++; // if a value is found, increment to move beyond the . |
| type = type.substr(lastOfIdx); |
| } |
| |
| auto name = controllerServiceNode["name"].as<std::string>(); |
| auto id = getRequiredIdField(controllerServiceNode, CONFIG_YAML_CONTROLLER_SERVICES_KEY); |
| |
| utils::Identifier uuid; |
| uuid = id; |
| std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node = createControllerService(type, fullType, name, uuid); |
| if (nullptr != controller_service_node) { |
| logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name); |
| controller_service_node->initialize(); |
| YAML::Node propertiesNode = controllerServiceNode["Properties"]; |
| // we should propagate properties to the node and to the implementation |
| parsePropertiesNodeYaml(propertiesNode, *controller_service_node, name, CONFIG_YAML_CONTROLLER_SERVICES_KEY); |
| if (auto controllerServiceImpl = controller_service_node->getControllerServiceImplementation(); controllerServiceImpl) { |
| parsePropertiesNodeYaml(propertiesNode, *controllerServiceImpl, name, CONFIG_YAML_CONTROLLER_SERVICES_KEY); |
| } |
| } else { |
| logger_->log_debug("Could not locate %s", type); |
| } |
| controller_services_->put(id, controller_service_node); |
| controller_services_->put(name, controller_service_node); |
| } catch (YAML::InvalidNode &) { |
| throw Exception(ExceptionType::GENERAL_EXCEPTION, "Name, id, and class must be specified for controller services"); |
| } |
| } |
| } |
| |
| void YamlConfiguration::parseConnectionYaml(const YAML::Node& connectionsNode, core::ProcessGroup* parent) { |
| if (!parent) { |
| logger_->log_error("parseProcessNode: no parent group was provided"); |
| return; |
| } |
| if (!connectionsNode || !connectionsNode.IsSequence()) { |
| return; |
| } |
| |
| for (YAML::const_iterator iter = connectionsNode.begin(); iter != connectionsNode.end(); ++iter) { |
| const auto connectionNode = iter->as<YAML::Node>(); |
| |
| // Configure basic connection |
| const std::string id = getOrGenerateId(connectionNode); |
| |
| // Default name to be same as ID |
| // If name is specified in configuration, use the value |
| const auto name = connectionNode["name"].as<std::string>(id); |
| |
| const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] { |
| logger_->log_debug("Incorrect connection UUID format."); |
| throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect connection UUID format."); |
| }); |
| |
| auto connection = createConnection(name, uuid.value()); |
| logger_->log_debug("Created connection with UUID %s and name %s", id, name); |
| const yaml::YamlConnectionParser connectionParser(connectionNode, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_); |
| connectionParser.configureConnectionSourceRelationshipsFromYaml(*connection); |
| connection->setMaxQueueSize(connectionParser.getWorkQueueSizeFromYaml()); |
| connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSizeFromYaml()); |
| connection->setSwapThreshold(connectionParser.getSwapThresholdFromYaml()); |
| connection->setSourceUUID(connectionParser.getSourceUUIDFromYaml()); |
| connection->setDestinationUUID(connectionParser.getDestinationUUIDFromYaml()); |
| connection->setFlowExpirationDuration(connectionParser.getFlowFileExpirationFromYaml()); |
| connection->setDropEmptyFlowFiles(connectionParser.getDropEmptyFromYaml()); |
| |
| parent->addConnection(std::move(connection)); |
| } |
| } |
| |
| void YamlConfiguration::parsePortYaml(const YAML::Node& portNode, core::ProcessGroup* parent, sitetosite::TransferDirection direction) { |
| utils::Identifier uuid; |
| |
| if (!parent) { |
| logger_->log_error("parseProcessNode: no parent group existed"); |
| return; |
| } |
| |
| const auto inputPortsObj = portNode.as<YAML::Node>(); |
| |
| // Check for required fields |
| yaml::checkRequiredField(inputPortsObj, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); |
| auto nameStr = inputPortsObj["name"].as<std::string>(); |
| auto portId = getRequiredIdField(inputPortsObj, CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY, |
| "The field 'id' is required for " |
| "the port named '" + nameStr + "' in the YAML Config. If this port " |
| "is an input port for a NiFi Remote Process Group, the port " |
| "id should match the corresponding id specified in the NiFi configuration. " |
| "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX."); |
| uuid = portId; |
| |
| auto port = std::make_unique<minifi::RemoteProcessorGroupPort>( |
| stream_factory_, nameStr, parent->getURL(), this->configuration_, uuid); |
| port->setDirection(direction); |
| port->setTimeout(parent->getTimeout()); |
| port->setTransmitting(true); |
| port->setYieldPeriodMsec(parent->getYieldPeriodMsec()); |
| port->initialize(); |
| if (!parent->getInterface().empty()) |
| port->setInterface(parent->getInterface()); |
| if (parent->getTransportProtocol() == "HTTP") { |
| port->enableHTTP(); |
| if (!parent->getHttpProxyHost().empty()) |
| port->setHTTPProxy(parent->getHTTPProxy()); |
| } |
| // else defaults to RAW |
| |
| // handle port properties |
| const auto nodeVal = portNode.as<YAML::Node>(); |
| YAML::Node propertiesNode = nodeVal["Properties"]; |
| parsePropertiesNodeYaml(propertiesNode, *port, nameStr, CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); |
| |
| // add processor to parent |
| auto& processor = *port; |
| parent->addProcessor(std::move(port)); |
| processor.setScheduledState(core::RUNNING); |
| |
| if (inputPortsObj["max concurrent tasks"]) { |
| auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>(); |
| int32_t maxConcurrentTasks; |
| if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { |
| processor.setMaxConcurrentTasks(maxConcurrentTasks); |
| } |
| logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); |
| processor.setMaxConcurrentTasks(maxConcurrentTasks); |
| } |
| } |
| |
| void YamlConfiguration::parsePropertyValueSequence(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& component) { |
| for (const auto& iter : propertyValueNode) { |
| if (iter.IsDefined()) { |
| const auto nodeVal = iter.as<YAML::Node>(); |
| YAML::Node propertiesNode = nodeVal["value"]; |
| // must insert the sequence in differently. |
| const auto rawValueString = propertiesNode.as<std::string>(); |
| logger_->log_debug("Found %s=%s", propertyName, rawValueString); |
| if (!component.updateProperty(propertyName, rawValueString)) { |
| auto proc = dynamic_cast<core::Connectable*>(&component); |
| if (proc) { |
| logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", propertyName, rawValueString, proc->getName()); |
| if (!component.setDynamicProperty(propertyName, rawValueString)) { |
| logger_->log_warn("Unable to set the dynamic property %s with value %s", propertyName, rawValueString); |
| } else { |
| logger_->log_warn("Dynamic property %s with value %s set", propertyName, rawValueString); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| PropertyValue YamlConfiguration::getValidatedProcessorPropertyForDefaultTypeInfo(const core::Property& propertyFromProcessor, const YAML::Node& propertyValueNode) { |
| PropertyValue defaultValue; |
| defaultValue = propertyFromProcessor.getDefaultValue(); |
| const std::type_index defaultType = defaultValue.getTypeInfo(); |
| try { |
| PropertyValue coercedValue = defaultValue; |
| if (defaultType == typeid(int64_t)) { |
| coercedValue = propertyValueNode.as<int64_t>(); |
| } else if (defaultType == typeid(uint64_t)) { |
| uint64_t integer_value; |
| if (YAML::convert<uint64_t>::decode(propertyValueNode, integer_value)) { |
| coercedValue = integer_value; |
| } else { |
| coercedValue = propertyValueNode.as<std::string>(); |
| } |
| } else if (defaultType == typeid(int)) { |
| coercedValue = propertyValueNode.as<int>(); |
| } else if (defaultType == typeid(bool)) { |
| coercedValue = propertyValueNode.as<bool>(); |
| } else { |
| coercedValue = propertyValueNode.as<std::string>(); |
| } |
| return coercedValue; |
| } catch (const std::exception& e) { |
| logger_->log_error("Fetching property failed with an exception of %s", e.what()); |
| logger_->log_error("Invalid conversion for field %s. Value %s", propertyFromProcessor.getName(), propertyValueNode.as<std::string>()); |
| } catch (...) { |
| logger_->log_error("Invalid conversion for field %s. Value %s", propertyFromProcessor.getName(), propertyValueNode.as<std::string>()); |
| } |
| return defaultValue; |
| } |
| |
| void YamlConfiguration::parseSingleProperty(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor) { |
| core::Property myProp(propertyName, "", ""); |
| processor.getProperty(propertyName, myProp); |
| const PropertyValue coercedValue = getValidatedProcessorPropertyForDefaultTypeInfo(myProp, propertyValueNode); |
| bool property_set = false; |
| try { |
| property_set = processor.setProperty(myProp, coercedValue); |
| } catch(const utils::internal::InvalidValueException&) { |
| auto component = dynamic_cast<core::CoreComponent*>(&processor); |
| if (component == nullptr) { |
| logger_->log_error("processor was not a CoreComponent for property '%s'", propertyName); |
| } else { |
| logger_->log_error("Invalid value was set for property '%s' creating component '%s'", propertyName, component->getName()); |
| } |
| throw; |
| } |
| const auto rawValueString = propertyValueNode.as<std::string>(); |
| if (!property_set) { |
| auto proc = dynamic_cast<core::Connectable*>(&processor); |
| if (proc) { |
| logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", propertyName, rawValueString, proc->getName()); |
| if (!processor.setDynamicProperty(propertyName, rawValueString)) { |
| logger_->log_warn("Unable to set the dynamic property %s with value %s", propertyName, rawValueString); |
| } else { |
| logger_->log_warn("Dynamic property %s with value %s set", propertyName, rawValueString); |
| } |
| } |
| } else { |
| logger_->log_debug("Property %s with value %s set", propertyName, rawValueString); |
| } |
| } |
| |
| void YamlConfiguration::parsePropertyNodeElement(const std::string& propertyName, const YAML::Node& propertyValueNode, core::ConfigurableComponent& processor) { |
| logger_->log_trace("Encountered %s", propertyName); |
| if (propertyValueNode.IsNull() || !propertyValueNode.IsDefined()) { |
| return; |
| } |
| if (propertyValueNode.IsSequence()) { |
| parsePropertyValueSequence(propertyName, propertyValueNode, processor); |
| } else { |
| parseSingleProperty(propertyName, propertyValueNode, processor); |
| } |
| } |
| |
| void YamlConfiguration::parsePropertiesNodeYaml(const YAML::Node& propertiesNode, core::ConfigurableComponent& component, const std::string& component_name, |
| const std::string& yaml_section) { |
| // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated |
| logger_->log_trace("Entered %s", component_name); |
| for (const auto& propertyElem : propertiesNode) { |
| const auto propertyName = propertyElem.first.as<std::string>(); |
| const YAML::Node propertyValueNode = propertyElem.second; |
| parsePropertyNodeElement(propertyName, propertyValueNode, component); |
| } |
| |
| validateComponentProperties(component, component_name, yaml_section); |
| } |
| |
| void YamlConfiguration::parseFunnelsYaml(const YAML::Node& node, core::ProcessGroup* parent) { |
| if (!parent) { |
| logger_->log_error("parseFunnelsYaml: no parent group was provided"); |
| return; |
| } |
| if (!node || !node.IsSequence()) { |
| return; |
| } |
| |
| for (const auto& element : node) { |
| const auto funnel_node = element.as<YAML::Node>(); |
| |
| std::string id = getOrGenerateId(funnel_node); |
| |
| // Default name to be same as ID |
| const auto name = funnel_node["name"].as<std::string>(id); |
| |
| const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] { |
| logger_->log_debug("Incorrect funnel UUID format."); |
| throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect funnel UUID format."); |
| }); |
| |
| auto funnel = std::make_unique<Funnel>(name, uuid.value()); |
| logger_->log_debug("Created funnel with UUID %s and name %s", id, name); |
| funnel->setScheduledState(core::RUNNING); |
| funnel->setSchedulingStrategy(core::EVENT_DRIVEN); |
| parent->addProcessor(std::move(funnel)); |
| } |
| } |
| |
| void YamlConfiguration::parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type) { |
| if (!parent) { |
| logger_->log_error("parsePorts: no parent group was provided"); |
| return; |
| } |
| if (!node || !node.IsSequence()) { |
| return; |
| } |
| |
| for (const auto& element : node) { |
| const auto port_node = element.as<YAML::Node>(); |
| |
| std::string id = getOrGenerateId(port_node); |
| |
| // Default name to be same as ID |
| const auto name = port_node["name"].as<std::string>(id); |
| |
| const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] { |
| logger_->log_debug("Incorrect port UUID format."); |
| throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect port UUID format."); |
| }); |
| |
| auto port = std::make_unique<Port>(name, uuid.value(), port_type); |
| logger_->log_debug("Created port UUID %s and name %s", id, name); |
| port->setScheduledState(core::RUNNING); |
| port->setSchedulingStrategy(core::EVENT_DRIVEN); |
| parent->addPort(std::move(port)); |
| } |
| } |
| |
| void YamlConfiguration::validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string &yaml_section) const { |
| const auto &component_properties = component.getProperties(); |
| |
| // Validate required properties |
| for (const auto &prop_pair : component_properties) { |
| if (prop_pair.second.getRequired()) { |
| if (prop_pair.second.getValue().to_string().empty()) { |
| std::string reason = utils::StringUtils::join_pack("required property '", prop_pair.second.getName(), "' is not set"); |
| raiseComponentError(component_name, yaml_section, reason); |
| } else if (!prop_pair.second.getValue().validate(prop_pair.first).valid()) { |
| std::string reason = utils::StringUtils::join_pack("the value '", prop_pair.first, "' is not valid for property '", prop_pair.second.getName(), "'"); |
| raiseComponentError(component_name, yaml_section, reason); |
| } |
| } |
| } |
| |
| // Validate dependent properties |
| for (const auto &prop_pair : component_properties) { |
| const auto &dep_props = prop_pair.second.getDependentProperties(); |
| |
| if (prop_pair.second.getValue().to_string().empty()) { |
| continue; |
| } |
| |
| for (const auto &dep_prop_key : dep_props) { |
| if (component_properties.at(dep_prop_key).getValue().to_string().empty()) { |
| std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(), |
| "' depends on property '", dep_prop_key, "' which is not set"); |
| raiseComponentError(component_name, yaml_section, reason); |
| } |
| } |
| } |
| |
| #ifdef YAML_CONFIGURATION_USE_REGEX |
| // Validate mutually-exclusive properties |
| for (const auto &prop_pair : component_properties) { |
| const auto &excl_props = prop_pair.second.getExclusiveOfProperties(); |
| |
| if (prop_pair.second.getValue().empty()) { |
| continue; |
| } |
| |
| for (const auto &excl_pair : excl_props) { |
| utils::Regex excl_expr(excl_pair.second); |
| if (utils::regexMatch(component_properties.at(excl_pair.first).getValue().to_string(), excl_expr)) { |
| std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(), |
| "' must not be set when the value of property '", excl_pair.first, "' matches '", excl_pair.second, "'"); |
| raiseComponentError(component_name, yaml_section, reason); |
| } |
| } |
| } |
| |
| // Validate regex properties |
| for (const auto &prop_pair : component_properties) { |
| const auto &prop_regex_str = prop_pair.second.getValidRegex(); |
| |
| if (!prop_regex_str.empty()) { |
| utils::Regex prop_regex(prop_regex_str); |
| if (!utils::regexMatch(prop_pair.second.getValue().to_string(), prop_regex)) { |
| std::string reason = utils::StringUtils::join_pack("property '", prop_pair.second.getName(), "' does not match validation pattern '", prop_regex_str, "'"); |
| raiseComponentError(component_name, yaml_section, reason); |
| } |
| } |
| } |
| #else |
| logging::LOG_INFO(logger_) << "Validation of mutally-exclusive properties is disabled in this build."; |
| logging::LOG_INFO(logger_) << "Regex validation of properties is not available in this build."; |
| #endif // YAML_CONFIGURATION_USE_REGEX |
| } |
| |
| void YamlConfiguration::raiseComponentError(const std::string &component_name, const std::string &yaml_section, const std::string &reason) const { |
| std::string err_msg = "Unable to parse configuration file for component named '"; |
| err_msg.append(component_name); |
| err_msg.append("' because " + reason); |
| if (!yaml_section.empty()) { |
| err_msg.append(" [in '" + yaml_section + "' section of configuration file]"); |
| } |
| |
| logging::LOG_ERROR(logger_) << err_msg; |
| |
| throw std::invalid_argument(err_msg); |
| } |
| |
| std::string YamlConfiguration::getOrGenerateId(const YAML::Node& yamlNode, const std::string& idField) { |
| std::string id; |
| auto node = yamlNode.as<YAML::Node>(); |
| |
| if (node[idField]) { |
| if (YAML::NodeType::Scalar == node[idField].Type()) { |
| id = node[idField].as<std::string>(); |
| addNewId(id); |
| return id; |
| } |
| throw std::invalid_argument("getOrGenerateId: idField is expected to reference YAML::Node of YAML::NodeType::Scalar."); |
| } |
| |
| id = id_generator_->generate().to_string(); |
| logger_->log_debug("Generating random ID: id => [%s]", id); |
| return id; |
| } |
| |
| std::string YamlConfiguration::getRequiredIdField(const YAML::Node& yaml_node, std::string_view yaml_section, std::string_view error_message) { |
| yaml::checkRequiredField(yaml_node, "id", yaml_section, error_message); |
| auto id = yaml_node["id"].as<std::string>(); |
| addNewId(id); |
| return id; |
| } |
| |
| YAML::Node YamlConfiguration::getOptionalField(const YAML::Node& yamlNode, const std::string& fieldName, const YAML::Node& defaultValue, const std::string& yamlSection, |
| const std::string& providedInfoMessage) { |
| std::string infoMessage = providedInfoMessage; |
| auto result = yamlNode.as<YAML::Node>()[fieldName]; |
| if (!result) { |
| if (infoMessage.empty()) { |
| // Build a helpful info message for the user to inform them that a default is being used |
| infoMessage = |
| yamlNode.as<YAML::Node>()["name"] ? |
| "Using default value for optional field '" + fieldName + "' in component named '" + yamlNode.as<YAML::Node>()["name"].as<std::string>() + "'" : |
| "Using default value for optional field '" + fieldName + "' "; |
| if (!yamlSection.empty()) { |
| infoMessage += " [in '" + yamlSection + "' section of configuration file]: "; |
| } |
| |
| infoMessage += defaultValue.as<std::string>(); |
| } |
| logging::LOG_INFO(logger_) << infoMessage; |
| result = defaultValue; |
| } |
| |
| return result; |
| } |
| |
| void YamlConfiguration::addNewId(const std::string& uuid) { |
| const auto [_, success] = uuids_.insert(uuid); |
| if (!success) { |
| throw Exception(ExceptionType::GENERAL_EXCEPTION, "UUID " + uuid + " is duplicated in the flow configuration"); |
| } |
| } |
| |
| } // namespace org::apache::nifi::minifi::core |