MINIFICPP-1412 - Create ResouceClaim if processor fails to do so
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #944
diff --git a/extensions/standard-processors/processors/GenerateFlowFile.cpp b/extensions/standard-processors/processors/GenerateFlowFile.cpp
index 08abbbd..3008e23 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.cpp
+++ b/extensions/standard-processors/processors/GenerateFlowFile.cpp
@@ -124,16 +124,16 @@
logger_->log_error("Failed to create flowfile!");
return;
}
- if (fileSize_ > 0) {
- if (uniqueFlowFile_) {
- std::vector<char> data(gsl::narrow<size_t>(fileSize_));
+ if (uniqueFlowFile_) {
+ std::vector<char> data(gsl::narrow<size_t>(fileSize_));
+ if (fileSize_ > 0) {
generateData(data, textData_);
- GenerateFlowFile::WriteCallback callback(std::move(data));
- session->write(flowFile, &callback);
- } else {
- GenerateFlowFile::WriteCallback callback(data_);
- session->write(flowFile, &callback);
}
+ GenerateFlowFile::WriteCallback callback(std::move(data));
+ session->write(flowFile, &callback);
+ } else {
+ GenerateFlowFile::WriteCallback callback(data_);
+ session->write(flowFile, &callback);
}
session->transfer(flowFile, Success);
}
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index 9fefd92..5384459 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -41,6 +41,7 @@
#include "core/ProcessorNode.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
#include "utils/PropertyErrors.h"
+#include "utils/IntegrationTestUtils.h"
TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
TestController testController;
@@ -235,6 +236,7 @@
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::GenerateFlowFile>();
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<minifi::Configure>());
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GenerateFlowFile>("GFF");
processor->initialize();
processor->setProperty(processors::GenerateFlowFile::BatchSize, "10");
@@ -539,7 +541,7 @@
plan->runNextProcessor();
- // segfault
+ REQUIRE(utils::verifyLogLinePresenceInPollTime(std::chrono::seconds{0}, "did not create a ResourceClaim"));
LogTestController::getInstance().reset();
}
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 60ddafc..743adf8 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -170,6 +170,10 @@
void persistFlowFilesBeforeTransfer(
std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<core::FlowFile>>>& transactionMap,
const std::map<utils::Identifier, FlowFileUpdate>& modifiedFlowFiles);
+
+ void ensureNonNullResourceClaim(
+ const std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<core::FlowFile>>>& transactionMap);
+
// Clone the flow file during transfer to multiple connections for a relationship
std::shared_ptr<core::FlowFile> cloneDuringTransfer(const std::shared_ptr<core::FlowFile> &parent);
// ProcessContext
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 6a8273a..7be0feb 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -744,6 +744,8 @@
}
}
+ ensureNonNullResourceClaim(connectionQueues);
+
content_session_->commit();
persistFlowFilesBeforeTransfer(connectionQueues, _updatedFlowFiles);
@@ -891,6 +893,22 @@
}
}
+void ProcessSession::ensureNonNullResourceClaim(
+ const std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<core::FlowFile>>> &transactionMap) {
+ for (auto& transaction : transactionMap) {
+ for (auto& flowFile : transaction.second) {
+ auto claim = flowFile->getResourceClaim();
+ if (!claim) {
+ logger_->log_debug("Processor %s (%s) did not create a ResourceClaim, creating an empty one",
+ process_context_->getProcessorNode()->getUUIDStr(),
+ process_context_->getProcessorNode()->getName());
+ OutputStreamPipe emptyStreamCallback(std::make_shared<io::BufferStream>());
+ write(flowFile, &emptyStreamCallback);
+ }
+ }
+ }
+}
+
std::shared_ptr<core::FlowFile> ProcessSession::get() {
std::shared_ptr<Connectable> first = process_context_->getProcessorNode()->pickIncomingConnection();