MINIFI-294 Required vs. optional fields in config YAML
Long description of the changes in this commit can be found in the
JIRA for MINFI-294. Basically, better checks are now in place and
in the case of missing required fields, better error messaging.
This builds upon improvements made in MINIFI-275, and also
updates the config.yml exampe in the README.md file to match the
code.
This closes #90.
Signed-off-by: Aldrin Piri <aldrin@apache.org>
diff --git a/README.md b/README.md
index 7baefbc..ea73cd7 100644
--- a/README.md
+++ b/README.md
@@ -255,6 +255,7 @@
Connections:
- name: TransferFilesToRPG
id: 471deef6-2a6e-4a7d-912a-81cc17e3a207
+ source name: GetFile
source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206
source relationship name: success
destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 0429f2c..489bdaa 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -69,13 +69,12 @@
YAML::Node procNode = iter->as<YAML::Node>();
checkRequiredField(&procNode, "name", CONFIG_YAML_PROCESSORS_KEY);
- checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
-
procCfg.name = procNode["name"].as<std::string>();
procCfg.id = getOrGenerateId(&procNode);
uuid_parse(procCfg.id.c_str(), uuid);
logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]",
procCfg.name, procCfg.id);
+ checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
procCfg.javaClass = procNode["class"].as<std::string>();
logger_->log_debug("parseProcessorNode: class => [%s]",
procCfg.javaClass);
@@ -98,79 +97,73 @@
}
processor->setName(procCfg.name);
- procCfg.maxConcurrentTasks = procNode["max concurrent tasks"]
- .as<std::string>();
- logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]",
- procCfg.maxConcurrentTasks);
-
+ checkRequiredField(&procNode, "scheduling strategy", CONFIG_YAML_PROCESSORS_KEY);
procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>();
- logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]",
- procCfg.schedulingStrategy);
+ logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
+ checkRequiredField(&procNode, "scheduling period", CONFIG_YAML_PROCESSORS_KEY);
procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>();
- logger_->log_debug("parseProcessorNode: scheduling period => [%s]",
- procCfg.schedulingPeriod);
+ logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
- procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
- logger_->log_debug("parseProcessorNode: penalization period => [%s]",
- procCfg.penalizationPeriod);
+ if (procNode["max concurrent tasks"]) {
+ procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks);
+ }
- procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
- logger_->log_debug("parseProcessorNode: yield period => [%s]",
- procCfg.yieldPeriod);
+ if (procNode["penalization period"]) {
+ procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod);
+ }
- procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>();
- logger_->log_debug("parseProcessorNode: run duration nanos => [%s]",
- procCfg.runDurationNanos);
+ if (procNode["yield period"]) {
+ procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod);
+ }
+
+ if (procNode["run duration nanos"]) {
+ procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>();
+ logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
+ }
// handle auto-terminated relationships
- YAML::Node autoTerminatedSequence =
- procNode["auto-terminated relationships list"];
- std::vector<std::string> rawAutoTerminatedRelationshipValues;
- if (autoTerminatedSequence.IsSequence()
- && !autoTerminatedSequence.IsNull()
- && autoTerminatedSequence.size() > 0) {
- for (YAML::const_iterator relIter = autoTerminatedSequence.begin();
- relIter != autoTerminatedSequence.end(); ++relIter) {
- std::string autoTerminatedRel = relIter->as<std::string>();
- rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
+ if (procNode["auto-terminated relationships list"]) {
+ YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
+ std::vector<std::string> rawAutoTerminatedRelationshipValues;
+ if (autoTerminatedSequence.IsSequence()
+ && !autoTerminatedSequence.IsNull()
+ && autoTerminatedSequence.size() > 0) {
+ for (YAML::const_iterator relIter = autoTerminatedSequence.begin();
+ relIter != autoTerminatedSequence.end(); ++relIter) {
+ std::string autoTerminatedRel = relIter->as<std::string>();
+ rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
+ }
}
+ procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
}
- procCfg.autoTerminatedRelationships =
- rawAutoTerminatedRelationshipValues;
// handle processor properties
- YAML::Node propertiesNode = procNode["Properties"];
- parsePropertiesNodeYaml(&propertiesNode, processor);
+ if (procNode["Properties"]) {
+ YAML::Node propertiesNode = procNode["Properties"];
+ parsePropertiesNodeYaml(&propertiesNode, processor);
+ }
// Take care of scheduling
core::TimeUnit unit;
- if (core::Property::StringToTime(procCfg.schedulingPeriod,
- schedulingPeriod, unit)
- && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit,
- schedulingPeriod)) {
- logger_->log_debug(
- "convert: parseProcessorNode: schedulingPeriod => [%d] ns",
- schedulingPeriod);
+ if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit)
+ && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+ logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
processor->setSchedulingPeriodNano(schedulingPeriod);
}
- if (core::Property::StringToTime(procCfg.penalizationPeriod,
- penalizationPeriod, unit)
- && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit,
- penalizationPeriod)) {
- logger_->log_debug(
- "convert: parseProcessorNode: penalizationPeriod => [%d] ms",
- penalizationPeriod);
+ if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit)
+ && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
+ logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod);
processor->setPenalizationPeriodMsec(penalizationPeriod);
}
if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit)
- && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit,
- yieldPeriod)) {
- logger_->log_debug(
- "convert: parseProcessorNode: yieldPeriod => [%d] ms",
- yieldPeriod);
+ && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
+ logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
processor->setYieldPeriodMsec(yieldPeriod);
}
@@ -189,26 +182,20 @@
}
int64_t maxConcurrentTasks;
- if (core::Property::StringToInt(procCfg.maxConcurrentTasks,
- maxConcurrentTasks)) {
- logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
- maxConcurrentTasks);
+ if (core::Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
+ logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks);
}
- if (core::Property::StringToInt(procCfg.runDurationNanos,
- runDurationNanos)) {
- logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]",
- runDurationNanos);
+ if (core::Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
+ logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
processor->setRunDurationNano((uint64_t) runDurationNanos);
}
std::set<core::Relationship> autoTerminatedRelationships;
for (auto &&relString : procCfg.autoTerminatedRelationships) {
core::Relationship relationship(relString, "");
- logger_->log_debug(
- "parseProcessorNode: autoTerminatedRelationship => [%s]",
- relString);
+ logger_->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString);
autoTerminatedRelationships.insert(relationship);
}
@@ -240,59 +227,58 @@
++iter) {
YAML::Node currRpgNode = iter->as<YAML::Node>();
+ checkRequiredField(&currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
auto name = currRpgNode["name"].as<std::string>();
id = getOrGenerateId(&currRpgNode);
- logger_->log_debug(
- "parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
+ logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
+ checkRequiredField(&currRpgNode, "url", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
std::string url = currRpgNode["url"].as<std::string>();
logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
- std::string timeout = currRpgNode["timeout"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]",
- timeout);
-
- std::string yieldPeriod = currRpgNode["yield period"].as<std::string>();
- logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]",
- yieldPeriod);
-
- YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
- YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
core::ProcessGroup *group = NULL;
-
- uuid_parse(id.c_str(), uuid);
-
+ core::TimeUnit unit;
int64_t timeoutValue = -1;
int64_t yieldPeriodValue = -1;
-
+ uuid_parse(id.c_str(), uuid);
group = this->createRemoteProcessGroup(name.c_str(), uuid).release();
group->setParent(parentGroup);
parentGroup->addProcessGroup(group);
- core::TimeUnit unit;
+ if (currRpgNode["yield period"]) {
+ std::string yieldPeriod = currRpgNode["yield period"].as<std::string>();
+ logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod);
- if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
- && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit,
- yieldPeriodValue) && group) {
- logger_->log_debug(
- "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms",
- yieldPeriodValue);
- group->setYieldPeriodMsec(yieldPeriodValue);
+ if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
+ && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit,
+ yieldPeriodValue) && group) {
+ logger_->log_debug(
+ "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms",
+ yieldPeriodValue);
+ group->setYieldPeriodMsec(yieldPeriodValue);
+ }
}
- if (core::Property::StringToTime(timeout, timeoutValue, unit)
- && core::Property::ConvertTimeUnitToMS(timeoutValue, unit,
- timeoutValue) && group) {
- logger_->log_debug(
- "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms",
- timeoutValue);
- group->setTimeOut(timeoutValue);
+ if (currRpgNode["timeout"]) {
+ std::string timeout = currRpgNode["timeout"].as<std::string>();
+ logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout);
+
+ if (core::Property::StringToTime(timeout, timeoutValue, unit)
+ && core::Property::ConvertTimeUnitToMS(timeoutValue, unit,
+ timeoutValue) && group) {
+ logger_->log_debug(
+ "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms",
+ timeoutValue);
+ group->setTimeOut(timeoutValue);
+ }
}
group->setTransmitting(true);
group->setURL(url);
+ checkRequiredField(&currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+ YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
if (inputPorts && inputPorts.IsSequence()) {
for (YAML::const_iterator portIter = inputPorts.begin();
portIter != inputPorts.end(); ++portIter) {
@@ -303,6 +289,7 @@
this->parsePortYaml(&currPort, group, SEND);
} // for node
}
+ YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
if (outputPorts && outputPorts.IsSequence()) {
for (YAML::const_iterator portIter = outputPorts.begin();
portIter != outputPorts.end(); ++portIter) {
@@ -408,6 +395,7 @@
// Configure basic connection
uuid_t uuid;
+ checkRequiredField(&connectionNode, "name", CONFIG_YAML_CONNECTIONS_KEY);
std::string name = connectionNode["name"].as<std::string>();
std::string id = getOrGenerateId(&connectionNode);
uuid_parse(id.c_str(), uuid);
@@ -417,44 +405,44 @@
// Configure connection source
- auto rawRelationship = connectionNode["source relationship name"]
- .as<std::string>();
+ checkRequiredField(&connectionNode, "source relationship name", CONFIG_YAML_CONNECTIONS_KEY);
+ auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
core::Relationship relationship(rawRelationship, "");
- logger_->log_debug(
- "parseConnection: relationship => [%s]", rawRelationship);
+ logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
if (connection) {
connection->setRelationship(relationship);
}
uuid_t srcUUID;
- std::string connectionSrcProcName = connectionNode["source name"]
- .as<std::string>();
- if (connectionNode["source id"]) {
- std::string connectionSrcProcId = connectionNode["source id"]
- .as<std::string>();
+ if (connectionNode["source id"]) {
+ std::string connectionSrcProcId = connectionNode["source id"].as<std::string>();
uuid_parse(connectionSrcProcId.c_str(), srcUUID);
+ logger_->log_debug("Using 'source id' to match source with same id for "
+ "connection '%s': source id => [%s]", name, connectionSrcProcId);
} else {
- // if we don't have a source id, try harder to resolve the source processor.
- // config schema v2 will make this unnecessary
+ // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
+ checkRequiredField(&connectionNode, "source name", CONFIG_YAML_CONNECTIONS_KEY);
+ std::string connectionSrcProcName = connectionNode["source name"].as<std::string>();
uuid_t tmpUUID;
- if (!uuid_parse(connectionSrcProcName.c_str(), tmpUUID) &&
- NULL != parent->findProcessor(tmpUUID)) {
+ if (!uuid_parse(connectionSrcProcName.c_str(), tmpUUID) && NULL != parent->findProcessor(tmpUUID)) {
// the source name is a remote port id, so use that as the source id
uuid_copy(srcUUID, tmpUUID);
+ logger_->log_debug("Using 'source name' containing a remote port id to match the source for "
+ "connection '%s': source name => [%s]", name, connectionSrcProcName);
} else {
// lastly, look the processor up by name
auto srcProcessor = parent->findProcessor(connectionSrcProcName);
if (NULL != srcProcessor) {
srcProcessor->getUUID(srcUUID);
+ logger_->log_debug("Using 'source name' to match source with same name for "
+ "connection '%s': source name => [%s]", name, connectionSrcProcName);
} else {
// we ran out of ways to discover the source processor
logger_->log_error(
- "Could not locate a source with name %s to create a connection",
- connectionSrcProcName);
+ "Could not locate a source with name %s to create a connection", connectionSrcProcName);
throw std::invalid_argument(
- "Could not locate a source with name " +
- connectionSrcProcName + " to create a connection ");
+ "Could not locate a source with name " + connectionSrcProcName + " to create a connection ");
}
}
}
@@ -462,33 +450,36 @@
// Configure connection destination
uuid_t destUUID;
- std::string connectionDestProcName = connectionNode["destination name"]
- .as<std::string>();
if (connectionNode["destination id"]) {
- std::string connectionDestProcId = connectionNode["destination id"]
- .as<std::string>();
+ std::string connectionDestProcId = connectionNode["destination id"].as<std::string>();
uuid_parse(connectionDestProcId.c_str(), destUUID);
+ logger_->log_debug("Using 'destination id' to match destination with same id for "
+ "connection '%s': destination id => [%s]", name, connectionDestProcId);
} else {
// we use the same logic as above for resolving the source processor
// for looking up the destination processor in absence of a processor id
+ checkRequiredField(&connectionNode, "destination name", CONFIG_YAML_CONNECTIONS_KEY);
+ std::string connectionDestProcName = connectionNode["destination name"].as<std::string>();
uuid_t tmpUUID;
if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) &&
NULL != parent->findProcessor(tmpUUID)) {
// the destination name is a remote port id, so use that as the dest id
uuid_copy(destUUID, tmpUUID);
+ logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for "
+ "connection '%s': destination name => [%s]", name, connectionDestProcName);
} else {
// look the processor up by name
auto destProcessor = parent->findProcessor(connectionDestProcName);
if (NULL != destProcessor) {
destProcessor->getUUID(destUUID);
+ logger_->log_debug("Using 'destination name' to match destination with same name for "
+ "connection '%s': destination name => [%s]", name, connectionDestProcName);
} else {
// we ran out of ways to discover the destination processor
logger_->log_error(
- "Could not locate a destination with name %s to create a connection",
- connectionDestProcName);
+ "Could not locate a destination with name %s to create a connection", connectionDestProcName);
throw std::invalid_argument(
- "Could not locate a destination with name " +
- connectionDestProcName + " to create a connection");
+ "Could not locate a destination with name " + connectionDestProcName + " to create a connection");
}
}
}
@@ -539,21 +530,21 @@
// handle port properties
YAML::Node nodeVal = portNode->as<YAML::Node>();
YAML::Node propertiesNode = nodeVal["Properties"];
-
parsePropertiesNodeYaml(&propertiesNode, processor);
// add processor to parent
parent->addProcessor(processor);
processor->setScheduledState(core::RUNNING);
- auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"]
- .as<std::string>();
- int64_t maxConcurrentTasks;
- if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
+
+ if (inputPortsObj["max concurrent tasks"]) {
+ auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>();
+ int64_t maxConcurrentTasks;
+ if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
+ processor->setMaxConcurrentTasks(maxConcurrentTasks);
+ }
+ logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
processor->setMaxConcurrentTasks(maxConcurrentTasks);
}
- logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
- maxConcurrentTasks);
- processor->setMaxConcurrentTasks(maxConcurrentTasks);
}
void YamlConfiguration::parsePropertiesNodeYaml(
@@ -624,8 +615,8 @@
"' section of configuration file]";
}
}
- logger_->log_error(errorMessage.c_str());
- throw std::invalid_argument(errorMessage);
+ logger_->log_error(errMsg.c_str());
+ throw std::invalid_argument(errMsg);
}
}