MINIFICPP-1307 - Add attribute-based merge test
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #852
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 0173ba2..dd6e204 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -752,3 +752,80 @@
}
LogTestController::getInstance().reset();
}
+
+TEST_CASE("MergeFileOnAttribute", "[mergefiletest5]") {
+ {
+ std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+ std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+
+ // Create and write to the test file
+ for (int i = 0; i < 6; i++) {
+ std::ofstream{std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt", std::ios::binary} << std::to_string(i);
+ if (i % 2 == 0)
+ expectfileFirst << std::to_string(i);
+ else
+ expectfileSecond << std::to_string(i);
+ }
+ }
+
+ MergeTestController testController;
+ auto context = testController.context;
+ auto processor = testController.processor;
+ auto input = testController.input;
+ auto output = testController.output;
+
+ context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+ context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+ context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, DELIMITER_STRATEGY_TEXT);
+ context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MinEntries, "3");
+ context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
+
+ core::ProcessSession sessionGenFlowFile(context);
+ std::shared_ptr<core::FlowFile> record[6];
+
+ // Generate 6 flowfiles, even files are merged to one, odd files are merged to an other
+ for (int i = 0; i < 6; i++) {
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
+ std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
+ sessionGenFlowFile.import(flowFileName, flow, true, 0);
+ if (i % 2 == 0)
+ flow->setAttribute("tag", "even");
+ else
+ flow->setAttribute("tag", "odd");
+ record[i] = flow;
+ }
+ input->put(record[0]);
+ input->put(record[1]);
+ input->put(record[2]);
+ input->put(record[3]);
+ input->put(record[4]);
+ input->put(record[5]);
+
+ REQUIRE(processor->getName() == "mergecontent");
+ auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+ processor->onSchedule(context, factory);
+ for (int i = 0; i < 6; i++) {
+ auto session = std::make_shared<core::ProcessSession>(context);
+ processor->onTrigger(context, session);
+ session->commit();
+ }
+ // validate the merge content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ std::shared_ptr<core::FlowFile> flow1 = output->poll(expiredFlowRecords);
+ std::shared_ptr<core::FlowFile> flow2 = output->poll(expiredFlowRecords);
+ {
+ FixedBuffer callback(flow1->getSize());
+ sessionGenFlowFile.read(flow1, &callback);
+ std::ifstream file1(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
+ std::string contents((std::istreambuf_iterator<char>(file1)), std::istreambuf_iterator<char>());
+ REQUIRE(callback.to_string() == contents);
+ }
+ {
+ FixedBuffer callback(flow2->getSize());
+ sessionGenFlowFile.read(flow2, &callback);
+ std::ifstream file2(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
+ std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
+ REQUIRE(callback.to_string() == contents);
+ }
+ LogTestController::getInstance().reset();
+}