MINIFI-294 Required vs. optional fields in config YAML

Long description of the changes in this commit can be found in the
JIRA for MINFI-294. Basically, better checks are now in place and
in the case of missing required fields, better error messaging.
This builds upon improvements made in MINIFI-275, and also
updates the config.yml exampe in the README.md file to match the
code.

This closes #90.

Signed-off-by: Aldrin Piri <aldrin@apache.org>
diff --git a/README.md b/README.md
index 7baefbc..ea73cd7 100644
--- a/README.md
+++ b/README.md
@@ -255,6 +255,7 @@
     Connections:
         - name: TransferFilesToRPG
           id: 471deef6-2a6e-4a7d-912a-81cc17e3a207
+          source name: GetFile
           source id: 471deef6-2a6e-4a7d-912a-81cc17e3a206 
           source relationship name: success
           destination id: 471deef6-2a6e-4a7d-912a-81cc17e3a204
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 0429f2c..489bdaa 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -69,13 +69,12 @@
         YAML::Node procNode = iter->as<YAML::Node>();
 
         checkRequiredField(&procNode, "name", CONFIG_YAML_PROCESSORS_KEY);
-        checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
-
         procCfg.name = procNode["name"].as<std::string>();
         procCfg.id = getOrGenerateId(&procNode);
         uuid_parse(procCfg.id.c_str(), uuid);
         logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]",
                            procCfg.name, procCfg.id);
+        checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
         procCfg.javaClass = procNode["class"].as<std::string>();
         logger_->log_debug("parseProcessorNode: class => [%s]",
                            procCfg.javaClass);
@@ -98,79 +97,73 @@
         }
         processor->setName(procCfg.name);
 
-        procCfg.maxConcurrentTasks = procNode["max concurrent tasks"]
-            .as<std::string>();
-        logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]",
-                           procCfg.maxConcurrentTasks);
-
+        checkRequiredField(&procNode, "scheduling strategy", CONFIG_YAML_PROCESSORS_KEY);
         procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>();
-        logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]",
-                           procCfg.schedulingStrategy);
+        logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
 
+        checkRequiredField(&procNode, "scheduling period", CONFIG_YAML_PROCESSORS_KEY);
         procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>();
-        logger_->log_debug("parseProcessorNode: scheduling period => [%s]",
-                           procCfg.schedulingPeriod);
+        logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
 
-        procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
-        logger_->log_debug("parseProcessorNode: penalization period => [%s]",
-                           procCfg.penalizationPeriod);
+        if (procNode["max concurrent tasks"]) {
+          procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>();
+          logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks);
+        }
 
-        procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
-        logger_->log_debug("parseProcessorNode: yield period => [%s]",
-                           procCfg.yieldPeriod);
+        if (procNode["penalization period"]) {
+          procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>();
+          logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod);
+        }
 
-        procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>();
-        logger_->log_debug("parseProcessorNode: run duration nanos => [%s]",
-                           procCfg.runDurationNanos);
+        if (procNode["yield period"]) {
+          procCfg.yieldPeriod = procNode["yield period"].as<std::string>();
+          logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod);
+        }
+
+        if (procNode["run duration nanos"]) {
+          procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>();
+          logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
+        }
 
         // handle auto-terminated relationships
-        YAML::Node autoTerminatedSequence =
-            procNode["auto-terminated relationships list"];
-        std::vector<std::string> rawAutoTerminatedRelationshipValues;
-        if (autoTerminatedSequence.IsSequence()
-            && !autoTerminatedSequence.IsNull()
-            && autoTerminatedSequence.size() > 0) {
-          for (YAML::const_iterator relIter = autoTerminatedSequence.begin();
-              relIter != autoTerminatedSequence.end(); ++relIter) {
-            std::string autoTerminatedRel = relIter->as<std::string>();
-            rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
+        if (procNode["auto-terminated relationships list"]) {
+          YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"];
+          std::vector<std::string> rawAutoTerminatedRelationshipValues;
+          if (autoTerminatedSequence.IsSequence()
+              && !autoTerminatedSequence.IsNull()
+              && autoTerminatedSequence.size() > 0) {
+            for (YAML::const_iterator relIter = autoTerminatedSequence.begin();
+                 relIter != autoTerminatedSequence.end(); ++relIter) {
+              std::string autoTerminatedRel = relIter->as<std::string>();
+              rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
+            }
           }
+          procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
         }
-        procCfg.autoTerminatedRelationships =
-            rawAutoTerminatedRelationshipValues;
 
         // handle processor properties
-        YAML::Node propertiesNode = procNode["Properties"];
-        parsePropertiesNodeYaml(&propertiesNode, processor);
+        if (procNode["Properties"]) {
+          YAML::Node propertiesNode = procNode["Properties"];
+          parsePropertiesNodeYaml(&propertiesNode, processor);
+        }
 
         // Take care of scheduling
         core::TimeUnit unit;
-        if (core::Property::StringToTime(procCfg.schedulingPeriod,
-                                         schedulingPeriod, unit)
-            && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit,
-                                                   schedulingPeriod)) {
-          logger_->log_debug(
-              "convert: parseProcessorNode: schedulingPeriod => [%d] ns",
-              schedulingPeriod);
+        if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit)
+            && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
+          logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
           processor->setSchedulingPeriodNano(schedulingPeriod);
         }
 
-        if (core::Property::StringToTime(procCfg.penalizationPeriod,
-                                         penalizationPeriod, unit)
-            && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit,
-                                                   penalizationPeriod)) {
-          logger_->log_debug(
-              "convert: parseProcessorNode: penalizationPeriod => [%d] ms",
-              penalizationPeriod);
+        if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit)
+            && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
+          logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod);
           processor->setPenalizationPeriodMsec(penalizationPeriod);
         }
 
         if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit)
-            && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit,
-                                                   yieldPeriod)) {
-          logger_->log_debug(
-              "convert: parseProcessorNode: yieldPeriod => [%d] ms",
-              yieldPeriod);
+            && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
+          logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
           processor->setYieldPeriodMsec(yieldPeriod);
         }
 
@@ -189,26 +182,20 @@
         }
 
         int64_t maxConcurrentTasks;
-        if (core::Property::StringToInt(procCfg.maxConcurrentTasks,
-                                        maxConcurrentTasks)) {
-          logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
-                             maxConcurrentTasks);
+        if (core::Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
+          logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
           processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks);
         }
 
-        if (core::Property::StringToInt(procCfg.runDurationNanos,
-                                        runDurationNanos)) {
-          logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]",
-                             runDurationNanos);
+        if (core::Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
+          logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
           processor->setRunDurationNano((uint64_t) runDurationNanos);
         }
 
         std::set<core::Relationship> autoTerminatedRelationships;
         for (auto &&relString : procCfg.autoTerminatedRelationships) {
           core::Relationship relationship(relString, "");
-          logger_->log_debug(
-              "parseProcessorNode: autoTerminatedRelationship  => [%s]",
-              relString);
+          logger_->log_debug("parseProcessorNode: autoTerminatedRelationship  => [%s]", relString);
           autoTerminatedRelationships.insert(relationship);
         }
 
@@ -240,59 +227,58 @@
           ++iter) {
         YAML::Node currRpgNode = iter->as<YAML::Node>();
 
+        checkRequiredField(&currRpgNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
         auto name = currRpgNode["name"].as<std::string>();
         id = getOrGenerateId(&currRpgNode);
 
-        logger_->log_debug(
-            "parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
+        logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id);
 
+        checkRequiredField(&currRpgNode, "url", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
         std::string url = currRpgNode["url"].as<std::string>();
         logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
 
-        std::string timeout = currRpgNode["timeout"].as<std::string>();
-        logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]",
-                           timeout);
-
-        std::string yieldPeriod = currRpgNode["yield period"].as<std::string>();
-        logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]",
-                           yieldPeriod);
-
-        YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
-        YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
         core::ProcessGroup *group = NULL;
-
-        uuid_parse(id.c_str(), uuid);
-
+        core::TimeUnit unit;
         int64_t timeoutValue = -1;
         int64_t yieldPeriodValue = -1;
-
+        uuid_parse(id.c_str(), uuid);
         group = this->createRemoteProcessGroup(name.c_str(), uuid).release();
         group->setParent(parentGroup);
         parentGroup->addProcessGroup(group);
 
-        core::TimeUnit unit;
+        if (currRpgNode["yield period"]) {
+          std::string yieldPeriod = currRpgNode["yield period"].as<std::string>();
+          logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod);
 
-        if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
-            && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit,
-                                                   yieldPeriodValue) && group) {
-          logger_->log_debug(
-              "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms",
-              yieldPeriodValue);
-          group->setYieldPeriodMsec(yieldPeriodValue);
+          if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit)
+              && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit,
+                                                     yieldPeriodValue) && group) {
+            logger_->log_debug(
+                "parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms",
+                yieldPeriodValue);
+            group->setYieldPeriodMsec(yieldPeriodValue);
+          }
         }
 
-        if (core::Property::StringToTime(timeout, timeoutValue, unit)
-            && core::Property::ConvertTimeUnitToMS(timeoutValue, unit,
-                                                   timeoutValue) && group) {
-          logger_->log_debug(
-              "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms",
-              timeoutValue);
-          group->setTimeOut(timeoutValue);
+        if (currRpgNode["timeout"]) {
+          std::string timeout = currRpgNode["timeout"].as<std::string>();
+          logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout);
+
+          if (core::Property::StringToTime(timeout, timeoutValue, unit)
+              && core::Property::ConvertTimeUnitToMS(timeoutValue, unit,
+                                                     timeoutValue) && group) {
+            logger_->log_debug(
+                "parseRemoteProcessGroupYaml: timeoutValue => [%d] ms",
+                timeoutValue);
+            group->setTimeOut(timeoutValue);
+          }
         }
 
         group->setTransmitting(true);
         group->setURL(url);
 
+        checkRequiredField(&currRpgNode, "Input Ports", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
+        YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
         if (inputPorts && inputPorts.IsSequence()) {
           for (YAML::const_iterator portIter = inputPorts.begin();
               portIter != inputPorts.end(); ++portIter) {
@@ -303,6 +289,7 @@
             this->parsePortYaml(&currPort, group, SEND);
           }  // for node
         }
+        YAML::Node outputPorts = currRpgNode["Output Ports"].as<YAML::Node>();
         if (outputPorts && outputPorts.IsSequence()) {
           for (YAML::const_iterator portIter = outputPorts.begin();
               portIter != outputPorts.end(); ++portIter) {
@@ -408,6 +395,7 @@
 
         // Configure basic connection
         uuid_t uuid;
+        checkRequiredField(&connectionNode, "name", CONFIG_YAML_CONNECTIONS_KEY);
         std::string name = connectionNode["name"].as<std::string>();
         std::string id = getOrGenerateId(&connectionNode);
         uuid_parse(id.c_str(), uuid);
@@ -417,44 +405,44 @@
 
 
         // Configure connection source
-        auto rawRelationship = connectionNode["source relationship name"]
-            .as<std::string>();
+        checkRequiredField(&connectionNode, "source relationship name", CONFIG_YAML_CONNECTIONS_KEY);
+        auto rawRelationship = connectionNode["source relationship name"].as<std::string>();
         core::Relationship relationship(rawRelationship, "");
-        logger_->log_debug(
-            "parseConnection: relationship => [%s]", rawRelationship);
+        logger_->log_debug("parseConnection: relationship => [%s]", rawRelationship);
         if (connection) {
           connection->setRelationship(relationship);
         }
 
         uuid_t srcUUID;
-        std::string connectionSrcProcName = connectionNode["source name"]
-            .as<std::string>();
-        if (connectionNode["source id"]) {
-          std::string connectionSrcProcId = connectionNode["source id"]
-              .as<std::string>();
 
+        if (connectionNode["source id"]) {
+          std::string connectionSrcProcId = connectionNode["source id"].as<std::string>();
           uuid_parse(connectionSrcProcId.c_str(), srcUUID);
+          logger_->log_debug("Using 'source id' to match source with same id for "
+                                 "connection '%s': source id => [%s]", name, connectionSrcProcId);
         } else {
-          // if we don't have a source id, try harder to resolve the source processor.
-          // config schema v2 will make this unnecessary
+          // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary
+          checkRequiredField(&connectionNode, "source name", CONFIG_YAML_CONNECTIONS_KEY);
+          std::string connectionSrcProcName = connectionNode["source name"].as<std::string>();
           uuid_t tmpUUID;
-          if (!uuid_parse(connectionSrcProcName.c_str(), tmpUUID) &&
-              NULL != parent->findProcessor(tmpUUID)) {
+          if (!uuid_parse(connectionSrcProcName.c_str(), tmpUUID) && NULL != parent->findProcessor(tmpUUID)) {
             // the source name is a remote port id, so use that as the source id
             uuid_copy(srcUUID, tmpUUID);
+            logger_->log_debug("Using 'source name' containing a remote port id to match the source for "
+                                   "connection '%s': source name => [%s]", name, connectionSrcProcName);
           } else {
             // lastly, look the processor up by name
             auto srcProcessor = parent->findProcessor(connectionSrcProcName);
             if (NULL != srcProcessor) {
               srcProcessor->getUUID(srcUUID);
+              logger_->log_debug("Using 'source name' to match source with same name for "
+                                     "connection '%s': source name => [%s]", name, connectionSrcProcName);
             } else {
               // we ran out of ways to discover the source processor
               logger_->log_error(
-                  "Could not locate a source with name %s to create a connection",
-                  connectionSrcProcName);
+                  "Could not locate a source with name %s to create a connection", connectionSrcProcName);
               throw std::invalid_argument(
-                  "Could not locate a source with name " +
-                      connectionSrcProcName + " to create a connection ");
+                  "Could not locate a source with name " + connectionSrcProcName + " to create a connection ");
             }
           }
         }
@@ -462,33 +450,36 @@
 
         // Configure connection destination
         uuid_t destUUID;
-        std::string connectionDestProcName = connectionNode["destination name"]
-            .as<std::string>();
         if (connectionNode["destination id"]) {
-          std::string connectionDestProcId = connectionNode["destination id"]
-              .as<std::string>();
+          std::string connectionDestProcId = connectionNode["destination id"].as<std::string>();
           uuid_parse(connectionDestProcId.c_str(), destUUID);
+          logger_->log_debug("Using 'destination id' to match destination with same id for "
+                                 "connection '%s': destination id => [%s]", name, connectionDestProcId);
         } else {
           // we use the same logic as above for resolving the source processor
           // for looking up the destination processor in absence of a processor id
+          checkRequiredField(&connectionNode, "destination name", CONFIG_YAML_CONNECTIONS_KEY);
+          std::string connectionDestProcName = connectionNode["destination name"].as<std::string>();
           uuid_t tmpUUID;
           if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) &&
               NULL != parent->findProcessor(tmpUUID)) {
             // the destination name is a remote port id, so use that as the dest id
             uuid_copy(destUUID, tmpUUID);
+            logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for "
+                                   "connection '%s': destination name => [%s]", name, connectionDestProcName);
           } else {
             // look the processor up by name
             auto destProcessor = parent->findProcessor(connectionDestProcName);
             if (NULL != destProcessor) {
               destProcessor->getUUID(destUUID);
+              logger_->log_debug("Using 'destination name' to match destination with same name for "
+                                     "connection '%s': destination name => [%s]", name, connectionDestProcName);
             } else {
               // we ran out of ways to discover the destination processor
               logger_->log_error(
-                  "Could not locate a destination with name %s to create a connection",
-                  connectionDestProcName);
+                  "Could not locate a destination with name %s to create a connection", connectionDestProcName);
               throw std::invalid_argument(
-                  "Could not locate a destination with name " +
-                      connectionDestProcName + " to create a connection");
+                  "Could not locate a destination with name " + connectionDestProcName + " to create a connection");
             }
           }
         }
@@ -539,21 +530,21 @@
   // handle port properties
   YAML::Node nodeVal = portNode->as<YAML::Node>();
   YAML::Node propertiesNode = nodeVal["Properties"];
-
   parsePropertiesNodeYaml(&propertiesNode, processor);
 
   // add processor to parent
   parent->addProcessor(processor);
   processor->setScheduledState(core::RUNNING);
-  auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"]
-      .as<std::string>();
-  int64_t maxConcurrentTasks;
-  if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
+
+  if (inputPortsObj["max concurrent tasks"]) {
+    auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>();
+    int64_t maxConcurrentTasks;
+    if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
+      processor->setMaxConcurrentTasks(maxConcurrentTasks);
+    }
+    logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
     processor->setMaxConcurrentTasks(maxConcurrentTasks);
   }
-  logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]",
-                     maxConcurrentTasks);
-  processor->setMaxConcurrentTasks(maxConcurrentTasks);
 }
 
 void YamlConfiguration::parsePropertiesNodeYaml(
@@ -624,8 +615,8 @@
             "' section of configuration file]";
       }
     }
-    logger_->log_error(errorMessage.c_str());
-    throw std::invalid_argument(errorMessage);
+    logger_->log_error(errMsg.c_str());
+    throw std::invalid_argument(errMsg);
   }
 }