MINIFICPP-1412 - Create ResouceClaim if processor fails to do so

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #944
diff --git a/extensions/standard-processors/processors/GenerateFlowFile.cpp b/extensions/standard-processors/processors/GenerateFlowFile.cpp
index 08abbbd..3008e23 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.cpp
+++ b/extensions/standard-processors/processors/GenerateFlowFile.cpp
@@ -124,16 +124,16 @@
       logger_->log_error("Failed to create flowfile!");
       return;
     }
-    if (fileSize_ > 0) {
-      if (uniqueFlowFile_) {
-        std::vector<char> data(gsl::narrow<size_t>(fileSize_));
+    if (uniqueFlowFile_) {
+      std::vector<char> data(gsl::narrow<size_t>(fileSize_));
+      if (fileSize_ > 0) {
         generateData(data, textData_);
-        GenerateFlowFile::WriteCallback callback(std::move(data));
-        session->write(flowFile, &callback);
-      } else {
-        GenerateFlowFile::WriteCallback callback(data_);
-        session->write(flowFile, &callback);
       }
+      GenerateFlowFile::WriteCallback callback(std::move(data));
+      session->write(flowFile, &callback);
+    } else {
+      GenerateFlowFile::WriteCallback callback(data_);
+      session->write(flowFile, &callback);
     }
     session->transfer(flowFile, Success);
   }
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index 9fefd92..5384459 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -41,6 +41,7 @@
 #include "core/ProcessorNode.h"
 #include "core/reporting/SiteToSiteProvenanceReportingTask.h"
 #include "utils/PropertyErrors.h"
+#include "utils/IntegrationTestUtils.h"
 
 TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
   TestController testController;
@@ -235,6 +236,7 @@
   TestController testController;
   LogTestController::getInstance().setDebug<minifi::processors::GenerateFlowFile>();
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  content_repo->initialize(std::make_shared<minifi::Configure>());
   std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GenerateFlowFile>("GFF");
   processor->initialize();
   processor->setProperty(processors::GenerateFlowFile::BatchSize, "10");
@@ -539,7 +541,7 @@
 
   plan->runNextProcessor();
 
-  // segfault
+  REQUIRE(utils::verifyLogLinePresenceInPollTime(std::chrono::seconds{0}, "did not create a ResourceClaim"));
 
   LogTestController::getInstance().reset();
 }
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 60ddafc..743adf8 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -170,6 +170,10 @@
   void persistFlowFilesBeforeTransfer(
       std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<core::FlowFile>>>& transactionMap,
       const std::map<utils::Identifier, FlowFileUpdate>& modifiedFlowFiles);
+
+  void ensureNonNullResourceClaim(
+      const std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<core::FlowFile>>>& transactionMap);
+
   // Clone the flow file during transfer to multiple connections for a relationship
   std::shared_ptr<core::FlowFile> cloneDuringTransfer(const std::shared_ptr<core::FlowFile> &parent);
   // ProcessContext
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 6a8273a..7be0feb 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -744,6 +744,8 @@
         }
     }
 
+    ensureNonNullResourceClaim(connectionQueues);
+
     content_session_->commit();
 
     persistFlowFilesBeforeTransfer(connectionQueues, _updatedFlowFiles);
@@ -891,6 +893,22 @@
   }
 }
 
+void ProcessSession::ensureNonNullResourceClaim(
+    const std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<core::FlowFile>>> &transactionMap) {
+  for (auto& transaction : transactionMap) {
+    for (auto& flowFile : transaction.second) {
+      auto claim = flowFile->getResourceClaim();
+      if (!claim) {
+        logger_->log_debug("Processor %s (%s) did not create a ResourceClaim, creating an empty one",
+                           process_context_->getProcessorNode()->getUUIDStr(),
+                           process_context_->getProcessorNode()->getName());
+        OutputStreamPipe emptyStreamCallback(std::make_shared<io::BufferStream>());
+        write(flowFile, &emptyStreamCallback);
+      }
+    }
+  }
+}
+
 std::shared_ptr<core::FlowFile> ProcessSession::get() {
   std::shared_ptr<Connectable> first = process_context_->getProcessorNode()->pickIncomingConnection();