| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <algorithm> |
| #include <cstdio> |
| #include <fstream> |
| #include <iostream> |
| #include <random> |
| #include <set> |
| #include <string> |
| |
| #include "FlowController.h" |
| #include "LogAttribute.h" |
| #include "TailFile.h" |
| #include "TextFragmentUtils.h" |
| #include "catch2/generators/catch_generators.hpp" |
| #include "core/Core.h" |
| #include "minifi-cpp/core/ProcessContext.h" |
| #include "core/ProcessSession.h" |
| #include "core/Processor.h" |
| #include "core/Resource.h" |
| #include "unit/Catch.h" |
| #include "unit/ProvenanceTestHelper.h" |
| #include "unit/SingleProcessorTestController.h" |
| #include "unit/TestBase.h" |
| #include "unit/TestUtils.h" |
| #include "utils/StringUtils.h" |
| #include "utils/file/FileUtils.h" |
| #include "controllers/AttributeProviderService.h" |
| |
| using namespace std::literals::chrono_literals; |
| |
| static constexpr std::string_view NEWLINE_FILE = "" // NOLINT |
| "one,two,three\n" |
| "four,five,six, seven"; |
| static constexpr std::string_view TMP_FILE = "minifi-tmpfile.txt"; |
| static constexpr std::string_view ROLLED_OVER_TMP_FILE = "minifi-tmpfile.txt.old.1"; |
| static constexpr std::string_view STATE_FILE = "minifi-state-file.txt"; |
| static constexpr std::string_view ROLLED_OVER_TAIL_DATA = "rolled_over_data\n"; |
| static constexpr std::string_view NEW_TAIL_DATA = "newdata\n"; |
| static constexpr std::string_view ADDITIONALY_CREATED_FILE_CONTENT = "additional file data\n"; |
| |
| namespace { |
| std::filesystem::path createTempFile(const std::filesystem::path& directory, const std::filesystem::path& file_name, |
| const std::string_view contents, const std::ios_base::openmode open_mode = std::ios::out | std::ios::binary, |
| const std::chrono::file_clock::duration offset = 0ms) { |
| if (!utils::file::exists(directory)) { |
| std::filesystem::create_directories(directory); |
| } |
| auto full_file_name = directory / file_name; |
| std::ofstream{full_file_name, open_mode} << contents; |
| std::filesystem::last_write_time(full_file_name, std::chrono::file_clock::now() + offset); |
| |
| return full_file_name; |
| } |
| |
| void appendTempFile(const std::filesystem::path& directory, const std::filesystem::path& file_name, |
| const std::string_view contents, const std::ios_base::openmode open_mode = std::ios::app | std::ios::binary) { |
| createTempFile(directory, file_name, contents, open_mode); |
| } |
| |
| } // namespace |
| |
| TEST_CASE("TailFile reads the file until the first delimiter then picks up the second line if a delimiter is written between runs", "[simple]") { |
| // Create and write to the test file |
| minifi::test::SingleProcessorTestController test_controller(minifi::test::utils::make_processor<minifi::processors::TailFile>("TailFile")); |
| |
| auto dir = test_controller.createTempDirectory(); |
| auto temp_file_path = dir / TMP_FILE; |
| |
| { |
| std::ofstream tmp_file; |
| tmp_file.open(temp_file_path, std::ios::out | std::ios::binary); |
| tmp_file << NEWLINE_FILE; |
| } |
| |
| test_controller.plan->setProperty(test_controller.getProcessor(), minifi::processors::TailFile::FileName, temp_file_path.string()); |
| test_controller.plan->setProperty(test_controller.getProcessor(), minifi::processors::TailFile::Delimiter, "\n"); |
| |
| { |
| auto trigger_res = test_controller.trigger(); |
| const auto& success_ffs = trigger_res.at(minifi::processors::TailFile::Success); |
| REQUIRE(success_ffs.size() == 1); |
| CHECK(test_controller.plan->getContent(success_ffs[0]) == "one,two,three\n"); |
| CHECK(success_ffs[0]->getAttribute(minifi::processors::textfragmentutils::BASE_NAME_ATTRIBUTE) == "minifi-tmpfile"); |
| CHECK(success_ffs[0]->getAttribute(minifi::processors::textfragmentutils::POST_NAME_ATTRIBUTE) == "txt"); |
| CHECK(success_ffs[0]->getAttribute(minifi::processors::textfragmentutils::OFFSET_ATTRIBUTE) == "0"); |
| CHECK(success_ffs[0]->getAttribute(core::SpecialFlowAttribute::PATH) == dir.string()); |
| CHECK(success_ffs[0]->getAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH) == temp_file_path.string() ); |
| CHECK(success_ffs[0]->getAttribute(core::SpecialFlowAttribute::FILENAME) == "minifi-tmpfile.0-13.txt"); |
| } |
| { |
| std::ofstream appendStream; |
| appendStream.open(temp_file_path, std::ios_base::app | std::ios_base::binary); |
| appendStream << std::endl; |
| } |
| { |
| auto trigger_res = test_controller.trigger(); |
| const auto& success_ffs = trigger_res.at(minifi::processors::TailFile::Success); |
| REQUIRE(success_ffs.size() == 1); |
| CHECK(test_controller.plan->getContent(success_ffs[0]) == "four,five,six, seven\n"); |
| CHECK(success_ffs[0]->getAttribute(minifi::processors::textfragmentutils::BASE_NAME_ATTRIBUTE) == "minifi-tmpfile"); |
| CHECK(success_ffs[0]->getAttribute(minifi::processors::textfragmentutils::POST_NAME_ATTRIBUTE) == "txt"); |
| CHECK(success_ffs[0]->getAttribute(minifi::processors::textfragmentutils::OFFSET_ATTRIBUTE) == "14"); |
| CHECK(success_ffs[0]->getAttribute(core::SpecialFlowAttribute::PATH) == dir.string()); |
| CHECK(success_ffs[0]->getAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH) == temp_file_path.string() ); |
| CHECK(success_ffs[0]->getAttribute(core::SpecialFlowAttribute::FILENAME) == "minifi-tmpfile.14-34.txt"); |
| } |
| } |
| |
| TEST_CASE("TailFile re-reads the file if the state is deleted between runs", "[state]") { |
| // Create and write to the test file |
| |
| TestController testController; |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setDebug<core::ProcessSession>(); |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfileProc"); |
| |
| plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); |
| |
| auto dir = testController.createTempDirectory(); |
| auto temp_file_path = dir / TMP_FILE; |
| |
| { |
| std::ofstream tmp_file; |
| tmp_file.open(temp_file_path, std::ios::out | std::ios::binary); |
| tmp_file << NEWLINE_FILE; |
| } |
| |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, temp_file_path.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.0-13.txt")); |
| |
| plan->reset(true); // start a new but with state file |
| LogTestController::getInstance().clear(); |
| |
| plan->getProcessContextForProcessor(tailfile)->createStateManager()->clear(); |
| |
| TestController::runSession(plan, true); |
| |
| // if we lose state we restart |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.0-13.txt")); |
| } |
| |
| TEST_CASE("TailFile picks up the state correctly if it is rewritten between runs", "[state]") { |
| // Create and write to the test file |
| TestController testController; |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setDebug<core::ProcessSession>(); |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfileProc"); |
| |
| plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); |
| |
| auto dir = testController.createTempDirectory(); |
| auto temp_file_path = dir / TMP_FILE; |
| |
| std::ofstream tmpfile; |
| tmpfile.open(temp_file_path, std::ios::out | std::ios::binary); |
| tmpfile << NEWLINE_FILE; |
| tmpfile.close(); |
| |
| std::ofstream appendStream; |
| appendStream.open(temp_file_path, std::ios_base::app | std::ios_base::binary); |
| appendStream.write("\n", 1); |
| appendStream.close(); |
| |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, temp_file_path.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.0-13.txt")); |
| |
| REQUIRE(temp_file_path.has_filename()); |
| REQUIRE(temp_file_path.has_parent_path()); |
| |
| // should stay the same |
| for (int i = 0; i < 5; i++) { |
| plan->reset(true); // start a new but with state file |
| LogTestController::getInstance().clear(); |
| |
| plan->getProcessContextForProcessor(tailfile)->createStateManager()->set({{"file.0.name", temp_file_path.filename().string()}, |
| {"file.0.position", "14"}, |
| {"file.0.current", temp_file_path.string()}}); |
| |
| TestController::runSession(plan, true); |
| |
| // if we lose state we restart |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.14-34.txt")); |
| } |
| for (int i = 14; i < 34; i++) { |
| plan->reset(true); // start a new but with state file |
| |
| plan->getProcessContextForProcessor(tailfile)->createStateManager()->set({{"file.0.name", temp_file_path.filename().string()}, |
| {"file.0.position", std::to_string(i)}, |
| {"file.0.current", temp_file_path.string()}}); |
| |
| TestController::runSession(plan, true); |
| } |
| |
| plan->runCurrentProcessor(); |
| for (int i = 14; i < 34; i++) { |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile." + std::to_string(i) + "-34.txt")); |
| } |
| } |
| |
| TEST_CASE("TailFile converts the old-style state file to the new-style state", "[state][migration]") { |
| // Create and write to the test file |
| TestController testController; |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setDebug<core::ProcessSession>(); |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| |
| auto plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfileProc"); |
| auto id = tailfile->getUUIDStr(); |
| |
| auto logattribute = plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); |
| plan->setProperty(logattribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| |
| auto dir = testController.createTempDirectory(); |
| auto state_file_path = dir / STATE_FILE; |
| |
| auto new_state_file_path = state_file_path.string() + ("." + id); |
| |
| SECTION("single") { |
| const auto temp_file = createTempFile(dir, TMP_FILE, std::string(NEWLINE_FILE) + '\n'); |
| |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, temp_file.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile, state_file_path.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| |
| std::ofstream new_state_file; |
| new_state_file.open(new_state_file_path); |
| SECTION("legacy") { |
| new_state_file << "FILENAME=" << temp_file.string() << std::endl; |
| new_state_file << "POSITION=14" << std::endl; |
| } |
| SECTION("newer single") { |
| new_state_file << "FILENAME=" << TMP_FILE << std::endl; |
| new_state_file << "POSITION." << TMP_FILE << "=14" << std::endl; |
| new_state_file << "CURRENT." << TMP_FILE << "=" << temp_file.string() << std::endl; |
| } |
| new_state_file.close(); |
| |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:minifi-tmpfile.14-34.txt")); |
| |
| std::unordered_map<std::string, std::string> state; |
| REQUIRE(plan->getProcessContextForProcessor(tailfile)->createStateManager()->get(state)); |
| |
| REQUIRE(temp_file.has_filename()); |
| REQUIRE(temp_file.has_parent_path()); |
| std::unordered_map<std::string, std::string> expected_state{{"file.0.name", temp_file.filename().string()}, |
| {"file.0.position", "35"}, |
| {"file.0.current", temp_file.string()}, |
| {"file.0.checksum", "1404369522"}}; |
| for (const auto& key_value_pair : expected_state) { |
| const auto it = state.find(key_value_pair.first); |
| REQUIRE(it != state.end()); |
| REQUIRE(it->second == key_value_pair.second); |
| } |
| REQUIRE(state.contains("file.0.last_read_time")); |
| } |
| |
| SECTION("multiple") { |
| const std::string file_name_1 = "bar.txt"; |
| const std::string file_name_2 = "foo.txt"; |
| const auto temp_file_1 = createTempFile(dir, file_name_1, std::string(NEWLINE_FILE) + '\n'); |
| const auto temp_file_2 = createTempFile(dir, file_name_2, std::string(NEWLINE_FILE) + '\n'); |
| |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode, "Multiple file"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory, dir.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::LookupFrequency, "0 sec"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, ".*\\.txt"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile, state_file_path.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| |
| std::ofstream newstatefile; |
| newstatefile.open(new_state_file_path); |
| newstatefile << "FILENAME=" << file_name_1 << std::endl; |
| newstatefile << "POSITION." << file_name_1 << "=14" << std::endl; |
| newstatefile << "CURRENT." << file_name_1 << "=" << temp_file_1.string() << std::endl; |
| newstatefile << "FILENAME=" << file_name_2 << std::endl; |
| newstatefile << "POSITION." << file_name_2 << "=15" << std::endl; |
| newstatefile << "CURRENT." << file_name_2 << "=" << temp_file_2.string() << std::endl; |
| newstatefile.close(); |
| |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains(file_name_1.substr(0, file_name_1.rfind('.')) + ".14-34.txt")); |
| REQUIRE(LogTestController::getInstance().contains(file_name_2.substr(0, file_name_2.rfind('.')) + ".15-34.txt")); |
| |
| std::unordered_map<std::string, std::string> state; |
| REQUIRE(plan->getProcessContextForProcessor(tailfile)->createStateManager()->get(state)); |
| |
| REQUIRE(temp_file_1.has_parent_path()); |
| REQUIRE(temp_file_1.has_filename()); |
| REQUIRE(temp_file_2.has_parent_path()); |
| REQUIRE(temp_file_2.has_filename()); |
| std::unordered_map<std::string, std::string> expected_state{{"file.0.name", temp_file_1.filename().string()}, |
| {"file.0.position", "35"}, |
| {"file.0.current", temp_file_1.string()}, |
| {"file.0.checksum", "1404369522"}, |
| {"file.1.name", temp_file_2.filename().string()}, |
| {"file.1.position", "35"}, |
| {"file.1.current", temp_file_2.string()}, |
| {"file.1.checksum", "2289158555"}}; |
| for (const auto& key_value_pair : expected_state) { |
| const auto it = state.find(key_value_pair.first); |
| REQUIRE(it != state.end()); |
| REQUIRE(it->second == key_value_pair.second); |
| } |
| REQUIRE(state.contains("file.0.last_read_time")); |
| REQUIRE(state.contains("file.1.last_read_time")); |
| } |
| } |
| |
| TEST_CASE("TailFile picks up the new File to Tail if it is changed between runs", "[state]") { |
| TestController testController; |
| LogTestController::getInstance().setDebug<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tail_file = plan->addProcessor("TailFile", "tail_file"); |
| plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| |
| auto log_attribute = plan->addProcessor("LogAttribute", "log_attribute", core::Relationship("success", "description"), true); |
| plan->setProperty(log_attribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| |
| auto directory = testController.createTempDirectory(); |
| auto first_test_file = createTempFile(directory, "first.log", "my first log line\n"); |
| plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName, first_test_file.string()); |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:first.0-17.log")); |
| |
| SECTION("The new file gets picked up") { |
| auto second_test_file = createTempFile(directory, "second.log", "my second log line\n"); |
| plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName, second_test_file.string()); |
| plan->reset(true); // clear the memory, but keep the state file |
| LogTestController::getInstance().clear(); |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:second.0-18.log")); |
| } |
| |
| SECTION("The old file will no longer be tailed") { |
| appendTempFile(directory, "first.log", "add some more stuff\n"); |
| auto second_test_file = createTempFile(directory, "second.log", ""); |
| plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName, second_test_file.string()); |
| plan->reset(true); // clear the memory, but keep the state file |
| LogTestController::getInstance().clear(); |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files")); |
| } |
| } |
| |
| TEST_CASE("TailFile picks up the new File to Tail if it is changed between runs (multiple file mode)", "[state][multiple_file]") { |
| TestController testController; |
| LogTestController::getInstance().setDebug<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| |
| auto directory = testController.createTempDirectory(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tail_file = plan->addProcessor("TailFile", "tail_file"); |
| plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::TailMode, "Multiple file"); |
| plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::BaseDirectory, directory.string()); |
| plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::LookupFrequency, "0 sec"); |
| plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName, "first\\..*\\.log"); |
| |
| auto log_attribute = plan->addProcessor("LogAttribute", "log_attribute", core::Relationship("success", "description"), true); |
| plan->setProperty(log_attribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| |
| createTempFile(directory, "first.fruit.log", "apple\n"); |
| createTempFile(directory, "second.fruit.log", "orange\n"); |
| createTempFile(directory, "first.animal.log", "hippopotamus\n"); |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:first.fruit.0-5.log")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:first.animal.0-12.log")); |
| |
| appendTempFile(directory, "first.fruit.log", "banana\n"); |
| appendTempFile(directory, "first.animal.log", "hedgehog\n"); |
| |
| SECTION("If a file no longer matches the new regex, then we stop tailing it") { |
| plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName, "first\\.f.*\\.log"); |
| plan->reset(true); // clear the memory, but keep the state file |
| LogTestController::getInstance().clear(); |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:first.fruit.6-12.log")); |
| } |
| |
| SECTION("If a new file matches the new regex, we start tailing it") { |
| plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName, ".*\\.fruit\\.log"); |
| plan->reset(true); // clear the memory, but keep the state file |
| LogTestController::getInstance().clear(); |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow file")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:first.fruit.6-12.log")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:second.fruit.0-6.log")); |
| } |
| } |
| |
| TEST_CASE("TailFile finds the single input file in both Single and Multiple mode", "[simple]") { |
| TestController testController; |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setDebug<core::ProcessSession>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfileProc"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter, ""); |
| |
| plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); |
| |
| auto dir = testController.createTempDirectory(); |
| auto temp_file_path = dir / TMP_FILE; |
| std::ofstream tmpfile; |
| tmpfile.open(temp_file_path, std::ios::out | std::ios::binary); |
| tmpfile << NEWLINE_FILE; |
| tmpfile.close(); |
| |
| SECTION("Single") { |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, temp_file_path.string()); |
| } |
| |
| SECTION("Multiple") { |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, "minifi-.*\\.txt"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode, "Multiple file"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory, dir.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::LookupFrequency, "0 sec"); |
| } |
| |
| TestController::runSession(plan, false); |
| auto records = plan->getProvenanceRecords(); |
| REQUIRE(records.size() == 2); |
| |
| TestController::runSession(plan, false); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.size()) + " Offset:0")); |
| |
| LogTestController::getInstance().reset(); |
| } |
| |
| TEST_CASE("TailFile picks up new files created between runs", "[multiple_file]") { |
| TestController testController; |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setDebug<core::ProcessSession>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| |
| auto dir = testController.createTempDirectory(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfile"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode, "Multiple file"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory, dir.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::LookupFrequency, "0 sec"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, ".*\\.log"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| |
| auto logattribute = plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); |
| plan->setProperty(logattribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| |
| createTempFile(dir, "application.log", "line1\nline2\n"); |
| |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| |
| createTempFile(dir, "another.log", "some more content\n"); |
| |
| plan->reset(); |
| LogTestController::getInstance().clear(); |
| |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file")); |
| |
| LogTestController::getInstance().reset(); |
| } |
| |
| TEST_CASE("TailFile can handle input files getting removed", "[multiple_file]") { |
| TestController testController; |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setDebug<core::ProcessSession>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| |
| auto dir = testController.createTempDirectory(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfile"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode, "Multiple file"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory, dir.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::LookupFrequency, "0 sec"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, ".*\\.log"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| |
| auto logattribute = plan->addProcessor("LogAttribute", "logattribute", |
| core::Relationship("success", "description"), |
| true); |
| plan->setProperty(logattribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| |
| createTempFile(dir, "one.log", "line one\n"); |
| createTempFile(dir, "two.log", "some stuff\n"); |
| |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| |
| plan->reset(); |
| LogTestController::getInstance().clear(); |
| |
| appendTempFile(dir, "one.log", "line two\nline three\nline four\n"); |
| CHECK(std::filesystem::remove(dir / "two.log")); |
| |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files")); |
| |
| LogTestController::getInstance().reset(); |
| } |
| |
| TEST_CASE("TailFile processes a very long line correctly", "[simple]") { |
| std::string line1("012\n"); |
| std::string line2(8050, 0); |
| std::mt19937 gen(std::random_device{}()); // NOLINT (linter wants a space before '{') [whitespace/braces] |
| std::generate_n(line2.begin(), line2.size() - 1, [&] { |
| // Make sure to only generate from characters that don't intersect with line1 and 3-4 |
| // Starting generation from 64 ensures that no numeric digit characters are added |
| return gsl::narrow<char>(64 + (gen() % (127 - 64))); |
| }); |
| line2.back() = '\n'; |
| std::string line3("345\n"); |
| std::string line4("6789"); |
| |
| TestController testController; |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>(); |
| LogTestController::getInstance().setTrace<core::ProcessSession>(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfileProc"); |
| |
| auto dir = testController.createTempDirectory(); |
| auto temp_file_path = dir / TMP_FILE; |
| std::ofstream tmpfile; |
| tmpfile.open(temp_file_path, std::ios::out | std::ios::binary); |
| tmpfile << line1 << line2 << line3 << line4; |
| tmpfile.close(); |
| |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, temp_file_path.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| |
| auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true); |
| plan->setProperty(log_attr, minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| plan->setProperty(log_attr, minifi::processors::LogAttribute::LogPayload, "true"); |
| plan->setProperty(log_attr, minifi::processors::LogAttribute::HexencodePayload, "true"); |
| |
| uint32_t line_length = 0U; |
| SECTION("with line length 80") { |
| line_length = 80U; |
| plan->setProperty(log_attr, minifi::processors::LogAttribute::MaxPayloadLineLength, "80"); |
| } |
| SECTION("with line length 200") { |
| line_length = 200U; |
| plan->setProperty(log_attr, minifi::processors::LogAttribute::MaxPayloadLineLength, "200"); |
| } |
| SECTION("with line length 0") { |
| line_length = 0U; |
| plan->setProperty(log_attr, minifi::processors::LogAttribute::MaxPayloadLineLength, "0"); |
| } |
| SECTION("with line length 16") { |
| line_length = 16U; |
| plan->setProperty(log_attr, minifi::processors::LogAttribute::MaxPayloadLineLength, "16"); |
| } |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files")); |
| REQUIRE(LogTestController::getInstance().contains(utils::string::to_hex(line1))); |
| auto line2_hex = utils::string::to_hex(line2); |
| if (line_length == 0U) { |
| REQUIRE(LogTestController::getInstance().contains(line2_hex)); |
| } else { |
| std::stringstream line2_hex_lines; |
| for (size_t i = 0; i < line2_hex.size(); i += line_length) { |
| line2_hex_lines << line2_hex.substr(i, line_length) << '\n'; |
| } |
| REQUIRE(LogTestController::getInstance().contains(line2_hex_lines.str())); |
| } |
| REQUIRE(LogTestController::getInstance().contains(utils::string::to_hex(line3))); |
| REQUIRE(false == LogTestController::getInstance().contains(utils::string::to_hex(line4), std::chrono::seconds(0))); |
| |
| LogTestController::getInstance().reset(); |
| } |
| |
| TEST_CASE("TailFile processes a long line followed by multiple newlines correctly", "[simple][edge_case]") { |
| // Test having two delimiters on the buffer boundary |
| std::string line1(4098, '\n'); |
| std::mt19937 gen(std::random_device { }()); |
| std::generate_n(line1.begin(), 4095, [&] { |
| // Make sure to only generate from characters that don't intersect with line2-4 |
| // Starting generation from 64 ensures that no numeric digit characters are added |
| return gsl::narrow<char>(64 + (gen() % (127 - 64))); |
| }); |
| std::string line2("012\n"); |
| std::string line3("345\n"); |
| std::string line4("6789"); |
| |
| // Create and write to the test file |
| |
| TestController testController; |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>(); |
| LogTestController::getInstance().setTrace<core::ProcessSession>(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfileProc"); |
| auto dir = testController.createTempDirectory(); |
| auto temp_file_path = dir / TMP_FILE; |
| std::ofstream tmpfile; |
| tmpfile.open(temp_file_path, std::ios::out | std::ios::binary); |
| tmpfile << line1 << line2 << line3 << line4; |
| tmpfile.close(); |
| |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, temp_file_path.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| |
| auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true); |
| plan->setProperty(log_attr, minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| plan->setProperty(log_attr, minifi::processors::LogAttribute::LogPayload, "true"); |
| plan->setProperty(log_attr, minifi::processors::LogAttribute::HexencodePayload, "true"); |
| plan->setProperty(log_attr, minifi::processors::LogAttribute::MaxPayloadLineLength, "80"); |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 5 flow files")); |
| auto line1_hex = utils::string::to_hex(line1.substr(0, 4096)); |
| std::stringstream line1_hex_lines; |
| for (size_t i = 0; i < line1_hex.size(); i += 80) { |
| line1_hex_lines << line1_hex.substr(i, 80) << '\n'; |
| } |
| REQUIRE(LogTestController::getInstance().contains(line1_hex_lines.str())); |
| REQUIRE(LogTestController::getInstance().contains(utils::string::to_hex(line2))); |
| REQUIRE(LogTestController::getInstance().contains(utils::string::to_hex(line3))); |
| REQUIRE(false == LogTestController::getInstance().contains(utils::string::to_hex(line4), std::chrono::seconds(0))); |
| |
| LogTestController::getInstance().reset(); |
| } |
| |
| TEST_CASE("TailFile onSchedule throws if file(s) to tail cannot be determined", "[configuration]") { |
| TestController testController; |
| LogTestController::getInstance().setDebug<minifi::processors::TailFile>(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfileProc"); |
| |
| SECTION("Single file mode by default") { |
| SECTION("No FileName") { |
| } |
| |
| SECTION("FileName does not contain the path") { |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, "minifi-log.txt"); |
| } |
| } |
| |
| SECTION("Explicit Single file mode") { |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode, "Single file"); |
| |
| SECTION("No FileName") { |
| } |
| |
| SECTION("FileName does not contain the path") { |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, "minifi-log.txt"); |
| } |
| } |
| |
| SECTION("Multiple file mode") { |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode, "Multiple file"); |
| |
| SECTION("No FileName and no BaseDirectory") { |
| } |
| |
| SECTION("No BaseDirectory") { |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, "minifi-.*\\.txt"); |
| } |
| } |
| |
| REQUIRE_THROWS(plan->runNextProcessor()); |
| } |
| |
| TEST_CASE("TailFile onSchedule throws in Multiple mode if the Base Directory does not exist", "[configuration][multiple_file]") { |
| TestController testController; |
| LogTestController::getInstance().setDebug<minifi::processors::TailFile>(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfileProc"); |
| plan->setProperty(tailfile, minifi::processors::TailFile::TailMode, "Multiple file"); |
| plan->setProperty(tailfile, minifi::processors::TailFile::FileName, ".*\\.log"); |
| |
| SECTION("No Base Directory is set") { |
| REQUIRE_THROWS(plan->runNextProcessor()); |
| } |
| |
| SECTION("Base Directory is set, but does not exist") { |
| std::string nonexistent_file_name{"/no-such-directory/688b01d0-9e5f-11ea-820d-f338c34d39a1/31d1a81a-9e5f-11ea-a77b-8b27d514a452"}; |
| plan->setProperty(tailfile, minifi::processors::TailFile::BaseDirectory, nonexistent_file_name); |
| REQUIRE_THROWS(plan->runNextProcessor()); |
| } |
| |
| SECTION("Base Directory is set and it exists") { |
| auto directory = testController.createTempDirectory(); |
| |
| plan->setProperty(tailfile, minifi::processors::TailFile::BaseDirectory, directory.string()); |
| plan->setProperty(tailfile, minifi::processors::TailFile::LookupFrequency, "0 sec"); |
| REQUIRE_NOTHROW(plan->runNextProcessor()); |
| } |
| } |
| |
| TEST_CASE("TailFile finds and finishes the renamed file and continues with the new log file", "[rotation]") { |
| minifi::test::SingleProcessorTestController test_controller(minifi::test::utils::make_processor<minifi::processors::TailFile>("TailFile")); |
| |
| constexpr char DELIM = ','; |
| constexpr size_t expected_pieces = std::ranges::count(NEWLINE_FILE, DELIM); // The last piece is left as considered unfinished |
| |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>(); |
| |
| auto dir = test_controller.createTempDirectory(); |
| auto in_file = dir / "testfifo.txt"; |
| |
| { |
| std::ofstream in_file_stream(in_file, std::ios::out | std::ios::binary); |
| in_file_stream << NEWLINE_FILE; |
| } |
| std::filesystem::last_write_time(in_file, std::chrono::file_clock::now() - 200ms); |
| |
| // Build MiNiFi processing graph |
| test_controller.plan->setProperty(test_controller.getProcessor(), minifi::processors::TailFile::Delimiter, std::string(1, DELIM)); |
| |
| SECTION("single") { |
| test_controller.plan->setProperty(test_controller.getProcessor(), minifi::processors::TailFile::FileName, in_file.string()); |
| } |
| SECTION("Multiple") { |
| test_controller.plan->setProperty(test_controller.getProcessor(), org::apache::nifi::minifi::processors::TailFile::FileName, "testfifo.txt"); |
| test_controller.plan->setProperty(test_controller.getProcessor(), org::apache::nifi::minifi::processors::TailFile::TailMode, "Multiple file"); |
| test_controller.plan->setProperty(test_controller.getProcessor(), org::apache::nifi::minifi::processors::TailFile::BaseDirectory, dir.string()); |
| test_controller.plan->setProperty(test_controller.getProcessor(), org::apache::nifi::minifi::processors::TailFile::LookupFrequency, "0 sec"); |
| } |
| { |
| auto res = test_controller.trigger(); |
| const auto& success_ffs = res.at(minifi::processors::TailFile::Success); |
| REQUIRE(success_ffs.size() == expected_pieces); |
| CHECK(test_controller.plan->getContent(success_ffs[0]) == "one,"); |
| CHECK(test_controller.plan->getContent(success_ffs[1]) == "two,"); |
| CHECK(test_controller.plan->getContent(success_ffs[2]) == "three\nfour,"); |
| CHECK(test_controller.plan->getContent(success_ffs[3]) == "five,"); |
| CHECK(test_controller.plan->getContent(success_ffs[4]) == "six,"); |
| } |
| |
| auto rotated_file = in_file; |
| rotated_file += ".1"; |
| |
| REQUIRE_NOTHROW(std::filesystem::rename(in_file, rotated_file)); |
| std::filesystem::last_write_time(rotated_file, std::chrono::file_clock::now()); |
| |
| { |
| std::ofstream new_in_file_stream(in_file, std::ios::out | std::ios::binary); |
| new_in_file_stream << "five" << DELIM << "six" << DELIM; |
| } |
| std::filesystem::last_write_time(in_file, std::chrono::file_clock::now() - 100ms); |
| |
| { |
| auto res = test_controller.trigger(); |
| const auto& success_ffs = res.at(minifi::processors::TailFile::Success); |
| CHECK(success_ffs.size() == 3); |
| CHECK(success_ffs[0]->getAttribute(core::SpecialFlowAttribute::FILENAME) == "testfifo.txt.28-33.1"); |
| CHECK(test_controller.plan->getContent(success_ffs[0]) == " seven"); |
| CHECK(success_ffs[1]->getAttribute(core::SpecialFlowAttribute::FILENAME) == "testfifo.0-4.txt"); |
| CHECK(test_controller.plan->getContent(success_ffs[1]) == "five,"); |
| CHECK(success_ffs[2]->getAttribute(core::SpecialFlowAttribute::FILENAME) == "testfifo.5-8.txt"); |
| CHECK(test_controller.plan->getContent(success_ffs[2]) == "six,"); |
| }; |
| } |
| |
| TEST_CASE("TailFile finds and finishes multiple rotated files and continues with the new log file", "[rotation]") { |
| minifi::test::SingleProcessorTestController test_controller(minifi::test::utils::make_processor<minifi::processors::TailFile>("TailFile")); |
| |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| |
| constexpr char DELIM = ':'; |
| |
| auto dir = test_controller.createTempDirectory(); |
| auto fruits_log_path = dir / "fruits.log"; |
| |
| REQUIRE(test_controller.getProcessor()->setProperty(minifi::processors::TailFile::FileName.name, fruits_log_path.string())); |
| REQUIRE(test_controller.getProcessor()->setProperty(minifi::processors::TailFile::Delimiter.name, std::string(1, DELIM))); |
| |
| const auto file_modi_time_t0 = std::chrono::file_clock::now(); |
| { |
| std::ofstream test_file_stream(fruits_log_path, std::ios::binary); |
| test_file_stream << "Apple" << DELIM << "Orange" << DELIM; |
| } |
| std::filesystem::last_write_time(fruits_log_path, file_modi_time_t0 - 1s); |
| |
| { |
| const auto result = test_controller.trigger(); |
| const auto& success_ffs = result.at(minifi::processors::TailFile::Success); |
| REQUIRE(success_ffs.size() == 2); |
| CHECK(test_controller.plan->getContent(success_ffs[0]) == "Apple:"); |
| CHECK(test_controller.plan->getContent(success_ffs[1]) == "Orange:"); |
| } |
| |
| { |
| std::ofstream test_file_stream(fruits_log_path, std::ios::binary | std::ios::app); |
| test_file_stream << "Pear" << DELIM; |
| } |
| |
| auto first_rotated_file = dir / "fruits.0.log"; |
| REQUIRE_NOTHROW(std::filesystem::rename(fruits_log_path, first_rotated_file)); |
| |
| { |
| std::ofstream test_file_stream_1(fruits_log_path, std::ios::binary); |
| test_file_stream_1 << "Pineapple" << DELIM << "Kiwi" << DELIM; |
| } |
| |
| auto second_rotated_file = dir / "fruits.1.log"; |
| REQUIRE_NOTHROW(std::filesystem::rename(fruits_log_path, second_rotated_file)); |
| |
| std::ofstream test_file_stream_2(fruits_log_path, std::ios::binary); |
| test_file_stream_2 << "Apricot" << DELIM; |
| test_file_stream_2.close(); |
| |
| { |
| const auto result = test_controller.trigger(); |
| const auto& success_ffs = result.at(minifi::processors::TailFile::Success); |
| REQUIRE(success_ffs.size() == 4); |
| CHECK(test_controller.plan->getContent(success_ffs[0]) == "Pear:"); |
| CHECK(test_controller.plan->getContent(success_ffs[1]) == "Pineapple:"); |
| CHECK(test_controller.plan->getContent(success_ffs[2]) == "Kiwi:"); |
| CHECK(test_controller.plan->getContent(success_ffs[3]) == "Apricot:"); |
| } |
| } |
| |
| TEST_CASE("TailFile ignores old rotated files", "[rotation]") { |
| TestController testController; |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setDebug<core::ProcessSession>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| |
| const auto dir = testController.createTempDirectory(); |
| auto log_file_name = dir / "test.log"; |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfile"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, log_file_name.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| |
| auto logattribute = plan->addProcessor("LogAttribute", "logattribute", |
| core::Relationship("success", "description"), |
| true); |
| plan->setProperty(logattribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| |
| createTempFile(dir, "test.2019-08-20", "line1\nline2\nline3\nline4\n", std::ios::out | std::ios::binary, -1s); // very old rotated file |
| createTempFile(dir, "test.log", "line5\nline6\nline7\n"); |
| |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files")); |
| REQUIRE(false == LogTestController::getInstance().contains("key:filename value:test.2019-08-20", std::chrono::seconds(0))); |
| |
| auto rotated_log_file_name = dir / "test.2020-05-18"; |
| REQUIRE_NOTHROW(std::filesystem::rename(log_file_name, rotated_log_file_name)); |
| |
| createTempFile(dir, "test.log", "line8\nline9\n"); |
| |
| plan->reset(); |
| LogTestController::getInstance().clear(); |
| |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| REQUIRE(false == LogTestController::getInstance().contains("key:filename value:test.2019-08-20", std::chrono::seconds(0))); |
| |
| LogTestController::getInstance().reset(); |
| } |
| |
| TEST_CASE("TailFile rotation works with multiple input files", "[rotation][multiple_file]") { |
| TestController testController; |
| |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>(); |
| |
| auto plan = testController.createPlan(); |
| auto dir = testController.createTempDirectory(); |
| |
| createTempFile(dir, "fruit.log", "apple\npear\nbanana\n", std::ios::out | std::ios::binary, -1100ms); |
| createTempFile(dir, "animal.log", "bear\ngiraffe\n", std::ios::out | std::ios::binary, -1100ms); |
| createTempFile(dir, "color.log", "red\nblue\nyellow\npurple\n", std::ios::out | std::ios::binary, -1100ms); |
| |
| auto tail_file = plan->addProcessor("TailFile", "Tail"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::Delimiter, "\n"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::TailMode, "Multiple file"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::FileName, ".*\\.log"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::BaseDirectory, dir.string()); |
| plan->setProperty(tail_file, minifi::processors::TailFile::LookupFrequency, "0 sec"); |
| |
| auto log_attribute = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true); |
| plan->setProperty(log_attribute, minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged " + std::to_string(3 + 2 + 4) + " flow files")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:fruit.0-5.log")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:fruit.6-10.log")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:fruit.11-17.log")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:animal.0-4.log")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:animal.5-12.log")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:color.0-3.log")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:color.4-8.log")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:color.9-15.log")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:color.16-22.log")); |
| |
| appendTempFile(dir, "fruit.log", "orange\n"); |
| appendTempFile(dir, "animal.log", "axolotl\n"); |
| appendTempFile(dir, "color.log", "aquamarine\n"); |
| |
| std::filesystem::rename(dir / "fruit.log", dir / "fruit.0"); |
| std::filesystem::rename(dir / "animal.log", dir / "animal.0"); |
| |
| createTempFile(dir, "fruit.log", "peach\n"); |
| createTempFile(dir, "animal.log", "dinosaur\n"); |
| appendTempFile(dir, "color.log", "turquoise\n"); |
| |
| plan->reset(); |
| LogTestController::getInstance().clear(); |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 6 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:fruit.18-24.0")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:fruit.0-5.log")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:animal.13-20.0")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:animal.0-8.log")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:color.23-33.log")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:color.34-43.log")); |
| } |
| |
| TEST_CASE("TailFile handles the Rolling Filename Pattern property correctly", "[rotation]") { |
| TestController testController; |
| |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>(); |
| |
| auto plan = testController.createPlan(); |
| auto dir = testController.createTempDirectory(); |
| auto test_file = createTempFile(dir, "test.log", "some stuff\n", std::ios::out | std::ios::binary, -100ms); |
| |
| // Build MiNiFi processing graph |
| auto tail_file = plan->addProcessor("TailFile", "Tail"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::Delimiter, "\n"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::FileName, test_file.string()); |
| |
| std::vector<std::string> expected_log_lines; |
| |
| SECTION("If no pattern is set, we use the default, which is ${filename}.*, so the unrelated file will be picked up") { |
| expected_log_lines = std::vector<std::string>{"Logged 2 flow files", |
| "test.rolled.11-24.log", |
| "test.0-15.txt"}; |
| } |
| |
| SECTION("If a pattern is set to exclude the unrelated file, we no longer pick it up") { |
| plan->setProperty(tail_file, minifi::processors::TailFile::RollingFilenamePattern, "${filename}.*.log"); |
| |
| expected_log_lines = std::vector<std::string>{"Logged 1 flow file", |
| "test.rolled.11-24.log"}; |
| } |
| |
| SECTION("We can also set the pattern to not include the file name") { |
| plan->setProperty(tail_file, minifi::processors::TailFile::RollingFilenamePattern, "other_roll??.log"); |
| |
| expected_log_lines = std::vector<std::string>{"Logged 1 flow file", |
| "other_rolled.11-24.log"}; |
| } |
| |
| auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true); |
| plan->setProperty(log_attr, minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:test.0-10.log")); |
| |
| appendTempFile(dir, "test.log", "one more line\n"); |
| std::filesystem::rename(dir / "test.log", dir / "test.rolled.log"); |
| createTempFile(dir, "test.txt", "unrelated stuff\n"); |
| createTempFile(dir, "other_rolled.log", "some stuff\none more line\n"); // same contents as test.rolled.log |
| |
| plan->reset(); |
| LogTestController::getInstance().clear(); |
| |
| TestController::runSession(plan, true); |
| |
| for (const auto &log_line : expected_log_lines) { |
| REQUIRE(LogTestController::getInstance().contains(log_line)); |
| } |
| } |
| |
| TEST_CASE("TailFile finds and finishes the renamed file and continues with the new log file after a restart", "[rotation][restart]") { |
| TestController testController; |
| |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>(); |
| |
| auto log_dir = testController.createTempDirectory(); |
| |
| const auto file_modi_time_t0 = std::chrono::file_clock::now(); |
| [[maybe_unused]] auto test_file_1 = createTempFile(log_dir, "test.1", "line one\nline two\nline three\n"); // old rotated file |
| std::filesystem::last_write_time(log_dir/"test.1", file_modi_time_t0 - 1100ms); |
| auto test_file = createTempFile(log_dir, "test.log", "line four\nline five\nline six\n"); // current log file |
| std::filesystem::last_write_time(log_dir/"test.log", file_modi_time_t0 - 100ms); |
| |
| auto state_dir = testController.createTempDirectory(); |
| |
| utils::Identifier tail_file_uuid = utils::IdGenerator::getIdGenerator()->generate(); |
| const core::Relationship success_relationship{"success", "everything is fine"}; |
| |
| // use persistent state storage that defaults to rocksDB, not volatile |
| const auto configuration = std::make_shared<minifi::ConfigureImpl>(); |
| { |
| auto test_plan = testController.createPlan(configuration, state_dir); |
| auto tail_file = test_plan->addProcessor("TailFile", tail_file_uuid, "Tail", {success_relationship}); |
| test_plan->setProperty(tail_file, minifi::processors::TailFile::FileName, test_file.string()); |
| auto log_attr = test_plan->addProcessor("LogAttribute", "Log", success_relationship, true); |
| test_plan->setProperty(log_attr, minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| test_plan->setProperty(log_attr, minifi::processors::LogAttribute::LogPayload, "true"); |
| |
| TestController::runSession(test_plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files")); |
| } |
| |
| LogTestController::getInstance().clear(); |
| |
| appendTempFile(log_dir, "test.log", "line seven\n"); |
| std::filesystem::rename(log_dir / "test.1", log_dir / "test.2"); |
| std::filesystem::rename(log_dir / "test.log", log_dir / "test.1"); |
| createTempFile(log_dir, "test.log", "line eight is the last line\n"); |
| |
| { |
| auto test_plan = testController.createPlan(configuration, state_dir); |
| auto tail_file = test_plan->addProcessor("TailFile", tail_file_uuid, "Tail", {success_relationship}); |
| test_plan->setProperty(tail_file, minifi::processors::TailFile::FileName, test_file.string()); |
| auto log_attr = test_plan->addProcessor("LogAttribute", "Log", success_relationship, true); |
| test_plan->setProperty(log_attr, minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| test_plan->setProperty(log_attr, minifi::processors::LogAttribute::LogPayload, "true"); |
| |
| TestController::runSession(test_plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:test.29-39.1")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:test.0-27.log")); |
| } |
| } |
| |
| TEST_CASE("TailFile yields if no work is done", "[yield]") { |
| TestController testController; |
| |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>(); |
| |
| auto temp_directory = testController.createTempDirectory(); |
| auto plan = testController.createPlan(); |
| auto tail_file = plan->addProcessor("TailFile", "Tail"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::Delimiter, "\n"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::TailMode, "Multiple file"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::FileName, ".*\\.log"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::BaseDirectory, temp_directory.string()); |
| plan->setProperty(tail_file, minifi::processors::TailFile::LookupFrequency, "0 sec"); |
| |
| SECTION("Empty log file => yield") { |
| createTempFile(temp_directory, "first.log", ""); |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(tail_file->getYieldTime() > 0ms); |
| |
| SECTION("No logging happened between onTrigger calls => yield") { |
| plan->reset(); |
| tail_file->clearYield(); |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(tail_file->getYieldTime() > 0ms); |
| } |
| |
| SECTION("Some logging happened between onTrigger calls => don't yield") { |
| plan->reset(); |
| tail_file->clearYield(); |
| |
| appendTempFile(temp_directory, "first.log", "stuff stuff\nand stuff\n"); |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(tail_file->getYieldTime() == 0ms); |
| } |
| } |
| |
| SECTION("Non-empty log file => don't yield") { |
| createTempFile(temp_directory, "second.log", "some content\n"); |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(tail_file->getYieldTime() == 0ms); |
| |
| SECTION("No logging happened between onTrigger calls => yield") { |
| plan->reset(); |
| tail_file->clearYield(); |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(tail_file->getYieldTime() > 0ms); |
| } |
| |
| SECTION("Some logging happened between onTrigger calls => don't yield") { |
| plan->reset(); |
| tail_file->clearYield(); |
| |
| appendTempFile(temp_directory, "second.log", "stuff stuff\nand stuff\n"); |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(tail_file->getYieldTime() == 0ms); |
| } |
| } |
| } |
| |
| TEST_CASE("TailFile yields if no work is done on any files", "[yield][multiple_file]") { |
| TestController testController; |
| |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>(); |
| |
| const auto temp_directory = testController.createTempDirectory(); |
| const auto plan = testController.createPlan(); |
| const auto tail_file = plan->addProcessor("TailFile", "Tail"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::Delimiter, "\n"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::TailMode, "Multiple file"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::FileName, ".*\\.log"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::BaseDirectory, temp_directory.string()); |
| plan->setProperty(tail_file, minifi::processors::TailFile::LookupFrequency, "0 sec"); |
| |
| createTempFile(temp_directory, "first.log", "stuff\n"); |
| createTempFile(temp_directory, "second.log", "different stuff\n"); |
| createTempFile(temp_directory, "third.log", "stuff stuff\n"); |
| |
| TestController::runSession(plan, true); |
| plan->reset(); |
| tail_file->clearYield(); |
| |
| SECTION("No file changed => yield") { |
| TestController::runSession(plan, true); |
| |
| REQUIRE(tail_file->getYieldTime() > 0ms); |
| } |
| |
| SECTION("One file changed => don't yield") { |
| SECTION("first") { appendTempFile(temp_directory, "first.log", "more stuff\n"); } |
| SECTION("second") { appendTempFile(temp_directory, "second.log", "more stuff\n"); } |
| SECTION("third") { appendTempFile(temp_directory, "third.log", "more stuff\n"); } |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(tail_file->getYieldTime() == 0ms); |
| } |
| |
| SECTION("More than one file changed => don't yield") { |
| SECTION("first and third") { |
| appendTempFile(temp_directory, "first.log", "more stuff\n"); |
| appendTempFile(temp_directory, "third.log", "more stuff\n"); |
| } |
| SECTION("all of them") { |
| appendTempFile(temp_directory, "first.log", "more stuff\n"); |
| appendTempFile(temp_directory, "second.log", "more stuff\n"); |
| appendTempFile(temp_directory, "third.log", "more stuff\n"); |
| } |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(tail_file->getYieldTime() == 0ms); |
| } |
| } |
| |
| TEST_CASE("TailFile doesn't yield if work was done on rotated files only", "[yield][rotation]") { |
| TestController testController; |
| |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>(); |
| |
| const auto file_modi_time_t0 = std::chrono::file_clock::now(); |
| const auto temp_directory = testController.createTempDirectory(); |
| const auto full_file_name = createTempFile(temp_directory, "test.log", "stuff\n"); |
| std::filesystem::last_write_time(full_file_name, file_modi_time_t0 - 100ms); |
| |
| const auto plan = testController.createPlan(); |
| |
| const auto tail_file = plan->addProcessor("TailFile", "Tail"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::Delimiter, "\n"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::FileName, full_file_name.string()); |
| |
| TestController::runSession(plan, true); |
| |
| plan->reset(); |
| tail_file->clearYield(); |
| |
| SECTION("File rotated but not written => yield") { |
| std::filesystem::rename(temp_directory / "test.log", temp_directory / "test.1"); |
| |
| SECTION("Don't create empty new log file") { |
| } |
| SECTION("Create empty new log file") { |
| createTempFile(temp_directory, "test.log", ""); |
| } |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(tail_file->getYieldTime() > 0ms); |
| } |
| |
| SECTION("File rotated and new stuff is added => don't yield") { |
| SECTION("New content before rotation") { |
| appendTempFile(temp_directory, "test.log", "more stuff\n"); |
| } |
| |
| std::filesystem::rename(temp_directory / "test.log", temp_directory / "test.1"); |
| |
| SECTION("New content after rotation") { |
| createTempFile(temp_directory, "test.log", "even more stuff\n"); |
| } |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(tail_file->getYieldTime() == 0ms); |
| } |
| } |
| |
| TEST_CASE("TailFile handles the Delimiter setting correctly", "[delimiter]") { |
| std::vector<std::pair<std::string, std::string>> test_cases = { |
| // first = value of Delimiter in the config |
| // second = the expected delimiter char which will be used |
| {"", ""}, {",", ","}, {"\t", "\t"}, {"\\t", "\t"}, {"\n", "\n"}, {"\\n", "\n"}, {"\\", "\\"}, {"\\\\", "\\"}}; |
| for (const auto &test_case : test_cases) { |
| TestController testController; |
| |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>(); |
| |
| auto temp_directory = testController.createTempDirectory(); |
| |
| std::string delimiter = test_case.second; |
| auto full_file_name = createTempFile(temp_directory, "test.log", utils::string::join_pack("one", delimiter, "two", delimiter)); |
| |
| auto plan = testController.createPlan(); |
| |
| auto tail_file = plan->addProcessor("TailFile", "Tail"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::Delimiter, test_case.first); |
| plan->setProperty(tail_file, minifi::processors::TailFile::FileName, full_file_name.string()); |
| |
| auto log_attribute = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true); |
| plan->setProperty(log_attribute, minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| |
| TestController::runSession(plan, true); |
| |
| if (delimiter.empty()) { |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:test.0-5.log")); |
| } else { |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:test.0-3.log")); |
| REQUIRE(LogTestController::getInstance().contains("key:filename value:test.4-7.log")); |
| } |
| } |
| } |
| |
| TEST_CASE("TailFile handles Unix/Windows line endings correctly", "[simple]") { |
| TestController testController; |
| |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>(); |
| |
| auto temp_directory = testController.createTempDirectory(); |
| auto full_file_name = createTempFile(temp_directory, "test.log", "line1\nline two\n", std::ios::out); // write in text mode |
| |
| auto plan = testController.createPlan(); |
| |
| auto tail_file = plan->addProcessor("TailFile", "Tail"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::FileName, full_file_name.string()); |
| |
| auto log_attribute = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true); |
| plan->setProperty(log_attribute, minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| |
| TestController::runSession(plan, true); |
| |
| #ifdef WIN32 |
| std::size_t line_ending_size = 2; |
| #else |
| std::size_t line_ending_size = 1; |
| #endif |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(5 + line_ending_size) + " Offset:0")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(8 + line_ending_size) + " Offset:0")); |
| } |
| |
| TEST_CASE("TailFile can tail all files in a directory recursively", "[multiple]") { |
| TestController testController; |
| |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>(); |
| |
| auto base_directory = testController.createTempDirectory(); |
| auto directory1 = base_directory / "one"; |
| utils::file::FileUtils::create_dir(directory1); |
| auto directory11 = directory1 / "one_child"; |
| utils::file::FileUtils::create_dir(directory11); |
| auto directory2 = base_directory / "two"; |
| utils::file::FileUtils::create_dir(directory2); |
| |
| createTempFile(base_directory, "test.orange.log", "orange juice\n"); |
| createTempFile(directory1, "test.blue.log", "blue\n"); |
| createTempFile(directory1, "test.orange.log", "orange autumn leaves\n"); |
| createTempFile(directory11, "test.camel.log", "camel\n"); |
| createTempFile(directory2, "test.triangle.log", "triangle\n"); |
| |
| auto plan = testController.createPlan(); |
| |
| auto tail_file = plan->addProcessor("TailFile", "Tail"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::TailMode, "Multiple file"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::BaseDirectory, base_directory.string()); |
| plan->setProperty(tail_file, minifi::processors::TailFile::LookupFrequency, "0 sec"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::FileName, ".*\\.log"); |
| |
| auto log_attribute = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true); |
| plan->setProperty(log_attribute, minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| plan->setProperty(log_attribute, minifi::processors::LogAttribute::LogPayload, "true"); |
| |
| SECTION("Recursive lookup not set => defaults to false") { |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file")); |
| } |
| |
| SECTION("Recursive lookup set to false") { |
| plan->setProperty(tail_file, minifi::processors::TailFile::RecursiveLookup, "false"); |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow file")); |
| } |
| |
| SECTION("Recursive lookup set to true") { |
| plan->setProperty(tail_file, minifi::processors::TailFile::RecursiveLookup, "true"); |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 5 flow files")); |
| } |
| } |
| |
| TEST_CASE("TailFile interprets the lookup frequency property correctly", "[multiple]") { |
| TestController testController; |
| |
| LogTestController::getInstance().setTrace<TestPlan>(); |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>(); |
| |
| auto directory = testController.createTempDirectory(); |
| createTempFile(directory, "test.red.log", "cherry\n"); |
| |
| auto plan = testController.createPlan(); |
| |
| auto tail_file = plan->addProcessor<minifi::processors::TailFile>("Tail"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::TailMode, "Multiple file"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::BaseDirectory, directory.string()); |
| plan->setProperty(tail_file, minifi::processors::TailFile::FileName, ".*\\.log"); |
| |
| auto log_attribute = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true); |
| plan->setProperty(log_attribute, minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| |
| SECTION("Lookup frequency not set => defaults to 10 minutes") { |
| TestController::runSession(plan, true); |
| |
| REQUIRE(tail_file.get().getLookupFrequency() == std::chrono::minutes{10}); |
| } |
| |
| SECTION("Lookup frequency set to zero => new files are picked up immediately") { |
| plan->setProperty(tail_file, minifi::processors::TailFile::LookupFrequency, "0 sec"); |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files")); |
| |
| plan->reset(); |
| LogTestController::getInstance().clear(); |
| |
| createTempFile(directory, "test.blue.log", "sky\n"); |
| createTempFile(directory, "test.green.log", "grass\n"); |
| |
| TestController::runSession(plan, true); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| } |
| |
| SECTION("Lookup frequency set to 500 ms => new files are only picked up after 500 ms") { |
| plan->setProperty(tail_file, minifi::processors::TailFile::LookupFrequency, "500 ms"); |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files")); |
| |
| plan->reset(); |
| LogTestController::getInstance().clear(); |
| |
| createTempFile(directory, "test.blue.log", "sky\n"); |
| createTempFile(directory, "test.green.log", "grass\n"); |
| |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files")); |
| |
| plan->reset(); |
| LogTestController::getInstance().clear(); |
| |
| std::this_thread::sleep_for(std::chrono::milliseconds(550)); |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| } |
| |
| SECTION("Lookup frequency set to a thousand years => files already present when started are still picked up") { |
| plan->setProperty(tail_file, minifi::processors::TailFile::LookupFrequency, "365000 days"); |
| TestController::runSession(plan, true); |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files")); |
| } |
| } |
| |
| TEST_CASE("TailFile reads from a single file when Initial Start Position is set", "[initialStartPosition]") { |
| TestController testController; |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfileProc"); |
| auto logattribute = plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); |
| |
| auto dir = testController.createTempDirectory(); |
| createTempFile(dir, ROLLED_OVER_TMP_FILE, ROLLED_OVER_TAIL_DATA); |
| auto temp_file_path = createTempFile(dir, TMP_FILE, NEWLINE_FILE); |
| |
| plan->setProperty(logattribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, temp_file_path.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| |
| SECTION("Initial Start Position is set to Beginning of File") { |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::InitialStartPosition, "Beginning of File"); |
| |
| TestController::runSession(plan); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.find_first_of('\n') + 1) + " Offset:0")); |
| |
| plan->reset(true); |
| LogTestController::getInstance().clear(); |
| |
| appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA); |
| |
| TestController::runSession(plan); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.size() - NEWLINE_FILE.find_first_of('\n') + NEW_TAIL_DATA.find_first_of('\n')) + " Offset:0")); |
| } |
| |
| SECTION("Initial Start Position is set to Beginning of Time") { |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::InitialStartPosition, "Beginning of Time"); |
| |
| TestController::runSession(plan); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.find_first_of('\n') + 1) + " Offset:0")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(ROLLED_OVER_TAIL_DATA.find_first_of('\n') + 1) + " Offset:0")); |
| |
| plan->reset(true); |
| LogTestController::getInstance().clear(); |
| |
| appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA); |
| |
| TestController::runSession(plan); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.size() - NEWLINE_FILE.find_first_of('\n') + NEW_TAIL_DATA.find_first_of('\n')) + " Offset:0")); |
| } |
| |
| SECTION("Initial Start Position is set to Current Time") { |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::InitialStartPosition, "Current Time"); |
| |
| TestController::runSession(plan); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files")); |
| |
| plan->reset(true); |
| LogTestController::getInstance().clear(); |
| |
| appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA); |
| |
| TestController::runSession(plan); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEW_TAIL_DATA.find_first_of('\n') + 1) + " Offset:0")); |
| } |
| |
| LogTestController::getInstance().reset(); |
| } |
| |
| TEST_CASE("TailFile reads from a single file when Initial Start Position is set to Current Time with rollover", "[initialStartPosition]") { |
| TestController testController; |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfileProc"); |
| auto logattribute = plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); |
| |
| const auto file_modi_time_t0 = std::chrono::file_clock::now(); |
| auto dir = testController.createTempDirectory(); |
| auto temp_file_path = createTempFile(dir, TMP_FILE, NEWLINE_FILE); |
| std::filesystem::last_write_time(temp_file_path, file_modi_time_t0 - 100ms); |
| |
| plan->setProperty(logattribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, temp_file_path.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::InitialStartPosition, "Current Time"); |
| |
| TestController::runSession(plan); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files")); |
| |
| plan->reset(true); |
| LogTestController::getInstance().clear(); |
| |
| const std::string DATA_IN_NEW_FILE = "data in new file\n"; |
| appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA); |
| std::filesystem::rename(dir / TMP_FILE, dir / ROLLED_OVER_TMP_FILE); |
| createTempFile(dir, TMP_FILE, DATA_IN_NEW_FILE); |
| |
| TestController::runSession(plan); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEW_TAIL_DATA.find_first_of('\n') + 1) + " Offset:0")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(DATA_IN_NEW_FILE.find_first_of('\n') + 1) + " Offset:0")); |
| |
| LogTestController::getInstance().reset(); |
| } |
| |
| TEST_CASE("TailFile reads multiple files when Initial Start Position is set", "[initialStartPosition]") { |
| TestController testController; |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfileProc"); |
| auto logattribute = plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); |
| |
| auto dir = testController.createTempDirectory(); |
| createTempFile(dir, ROLLED_OVER_TMP_FILE, ROLLED_OVER_TAIL_DATA); |
| createTempFile(dir, TMP_FILE, NEWLINE_FILE); |
| const std::string TMP_FILE_2_DATA = "tmp_file_2_new_line_data\n"; |
| createTempFile(dir, "minifi-tmpfile-2.txt", TMP_FILE_2_DATA); |
| |
| plan->setProperty(logattribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, ".*\\.txt"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode, "Multiple file"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory, dir.string()); |
| |
| SECTION("Initial Start Position is set to Beginning of File") { |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::InitialStartPosition, "Beginning of File"); |
| |
| TestController::runSession(plan); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.find_first_of('\n') + 1) + " Offset:0")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(TMP_FILE_2_DATA.find_first_of('\n') + 1) + " Offset:0")); |
| |
| plan->reset(true); |
| LogTestController::getInstance().clear(); |
| |
| createTempFile(dir, "minifi-tmpfile-3.txt", ADDITIONALY_CREATED_FILE_CONTENT); |
| appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA); |
| |
| TestController::runSession(plan); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(ADDITIONALY_CREATED_FILE_CONTENT.find_first_of('\n') + 1) + " Offset:0")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.size() - NEWLINE_FILE.find_first_of('\n') + NEW_TAIL_DATA.find_first_of('\n')) + " Offset:0")); |
| } |
| |
| SECTION("Initial Start Position is set to Beginning of Time") { |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::InitialStartPosition, "Beginning of Time"); |
| |
| TestController::runSession(plan); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.find_first_of('\n') + 1) + " Offset:0")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(ROLLED_OVER_TAIL_DATA.find_first_of('\n') + 1) + " Offset:0")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(TMP_FILE_2_DATA.find_first_of('\n') + 1) + " Offset:0")); |
| |
| plan->reset(true); |
| LogTestController::getInstance().clear(); |
| |
| createTempFile(dir, "minifi-tmpfile-3.txt", ADDITIONALY_CREATED_FILE_CONTENT); |
| appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA); |
| |
| TestController::runSession(plan); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(ADDITIONALY_CREATED_FILE_CONTENT.find_first_of('\n') + 1) + " Offset:0")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.size() - NEWLINE_FILE.find_first_of('\n') + NEW_TAIL_DATA.find_first_of('\n')) + " Offset:0")); |
| } |
| |
| SECTION("Initial Start Position is set to Current Time") { |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::InitialStartPosition, "Current Time"); |
| |
| TestController::runSession(plan); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files")); |
| |
| plan->reset(true); |
| LogTestController::getInstance().clear(); |
| |
| createTempFile(dir, "minifi-tmpfile-3.txt", ADDITIONALY_CREATED_FILE_CONTENT); |
| appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA); |
| |
| TestController::runSession(plan); |
| |
| REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(ADDITIONALY_CREATED_FILE_CONTENT.find_first_of('\n') + 1) + " Offset:0")); |
| REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEW_TAIL_DATA.find_first_of('\n') + 1) + " Offset:0")); |
| } |
| |
| LogTestController::getInstance().reset(); |
| } |
| |
| TEST_CASE("Initial Start Position is set to invalid or empty value", "[initialStartPosition]") { |
| TestController testController; |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tailfile = plan->addProcessor("TailFile", "tailfileProc"); |
| auto logattribute = plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); |
| |
| auto dir = testController.createTempDirectory(); |
| createTempFile(dir, ROLLED_OVER_TMP_FILE, ROLLED_OVER_TAIL_DATA); |
| auto temp_file_path = createTempFile(dir, TMP_FILE, NEWLINE_FILE); |
| |
| plan->setProperty(logattribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName, temp_file_path.string()); |
| plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); |
| |
| CHECK_FALSE(plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::InitialStartPosition, "")); |
| CHECK_FALSE(plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::InitialStartPosition, "Invalid Value")); |
| } |
| |
| TEST_CASE("TailFile onSchedule throws if an invalid Attribute Provider Service is found", "[configuration][AttributeProviderService]") { |
| TestController testController; |
| LogTestController::getInstance().setDebug<minifi::processors::TailFile>(); |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| auto tail_file = plan->addProcessor("TailFile", "tailfileProc"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::TailMode, "Multiple file"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::BaseDirectory, "/var/logs"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::FileName, "minifi.log"); |
| plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::AttributeProviderService, "this AttributeProviderService does not exist"); |
| |
| REQUIRE_THROWS_AS(plan->runNextProcessor(), minifi::Exception); |
| } |
| |
| namespace { |
| |
| class TestAttributeProviderService final : public minifi::controllers::AttributeProviderServiceImpl { |
| public: |
| using AttributeProviderServiceImpl::AttributeProviderServiceImpl; |
| |
| static constexpr const char* Description = "An attribute provider service which provides a constant set of records."; |
| static constexpr auto Properties = std::array<core::PropertyReference, 0>{}; |
| static constexpr bool SupportsDynamicProperties = false; |
| ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES |
| |
| void initialize() override {}; |
| void onEnable() override {}; |
| std::optional<std::vector<AttributeMap>> getAttributes() override { |
| return std::vector<AttributeMap>{AttributeMap{{"color", "red"}, {"fruit", "apple"}, {"uid", "001"}, {"animal", "dog"}}, |
| AttributeMap{{"color", "yellow"}, {"fruit", "banana"}, {"uid", "004"}, {"animal", "dolphin"}}}; |
| } |
| [[nodiscard]] std::string_view name() const override { return "test"; } |
| }; |
| REGISTER_RESOURCE(TestAttributeProviderService, ControllerService); |
| |
| } // namespace |
| |
| TEST_CASE("TailFile can use an AttributeProviderService", "[AttributeProviderService]") { |
| TestController testController; |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| LogTestController::getInstance().setDebug<core::ProcessSession>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| |
| std::filesystem::path temp_directory{testController.createTempDirectory()}; |
| |
| std::shared_ptr<TestPlan> plan = testController.createPlan(); |
| plan->addController("TestAttributeProviderService", "attribute_provider_service"); |
| auto tail_file = plan->addProcessor("TailFile", "tail_file"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::TailMode, "Multiple file"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::BaseDirectory, (temp_directory / "my_${color}_${fruit}_${uid}" / "${animal}").string()); |
| plan->setProperty(tail_file, minifi::processors::TailFile::LookupFrequency, "0 sec"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::FileName, ".*\\.log"); |
| plan->setProperty(tail_file, minifi::processors::TailFile::AttributeProviderService, "attribute_provider_service"); |
| auto log_attribute = plan->addProcessor("LogAttribute", "log_attribute", core::Relationship("success", ""), true); |
| plan->setProperty(log_attribute, minifi::processors::LogAttribute::FlowFilesToLog, "0"); |
| |
| createTempFile(temp_directory / "my_red_apple_001" / "dog", "0.log", "Idared\n"); |
| createTempFile(temp_directory / "my_red_apple_001" / "dog", "1.log", "Jonagold\n"); |
| createTempFile(temp_directory / "my_red_strawberry_002" / "elephant", "0.log", "red strawberry\n"); |
| createTempFile(temp_directory / "my_yellow_apple_003" / "horse", "0.log", "yellow apple\n"); |
| createTempFile(temp_directory / "my_yellow_banana_004" / "dolphin", "0.log", "yellow banana\n"); |
| |
| TestController::runSession(plan); |
| |
| CHECK(LogTestController::getInstance().contains("Logged 3 flow files")); |
| CHECK(LogTestController::getInstance().contains("key:absolute.path value:" + (temp_directory / "my_red_apple_001" / "dog" / "0.log").string())); |
| CHECK(LogTestController::getInstance().contains("key:absolute.path value:" + (temp_directory / "my_red_apple_001" / "dog" / "1.log").string())); |
| CHECK(LogTestController::getInstance().contains("key:absolute.path value:" + (temp_directory / "my_yellow_banana_004" / "dolphin" / "0.log").string())); |
| |
| CHECK(LogTestController::getInstance().contains("key:test.color value:red")); |
| CHECK(LogTestController::getInstance().contains("key:test.color value:yellow")); |
| CHECK(LogTestController::getInstance().contains("key:test.fruit value:apple")); |
| CHECK(LogTestController::getInstance().contains("key:test.fruit value:banana")); |
| CHECK(LogTestController::getInstance().contains("key:test.uid value:001")); |
| CHECK(LogTestController::getInstance().contains("key:test.uid value:004")); |
| CHECK(LogTestController::getInstance().contains("key:test.animal value:dog")); |
| CHECK(LogTestController::getInstance().contains("key:test.animal value:dolphin")); |
| |
| CHECK_FALSE(LogTestController::getInstance().contains("key:test.fruit value:strawberry", 0s, 0ms)); |
| CHECK_FALSE(LogTestController::getInstance().contains("key:test.uid value:002", 0s, 0ms)); |
| CHECK_FALSE(LogTestController::getInstance().contains("key:test.uid value:003", 0s, 0ms)); |
| CHECK_FALSE(LogTestController::getInstance().contains("key:test.animal value:elephant", 0s, 0ms)); |
| CHECK_FALSE(LogTestController::getInstance().contains("key:test.animal value:horse", 0s, 0ms)); |
| |
| LogTestController::getInstance().reset(); |
| } |
| |
| TEST_CASE("TailFile honors batch size for maximum lines processed", "[batchSize]") { |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| |
| minifi::test::SingleProcessorTestController test_controller(minifi::test::utils::make_processor<minifi::processors::TailFile>("TailFile")); |
| auto tailfile = test_controller.getProcessor(); |
| |
| auto temp_file_path = test_controller.createTempDirectory() / TMP_FILE; |
| |
| std::ofstream tmpfile; |
| tmpfile.open(temp_file_path, std::ios::out | std::ios::binary); |
| for (auto i = 0; i < 20; ++i) { |
| tmpfile << NEW_TAIL_DATA; |
| } |
| tmpfile.close(); |
| |
| REQUIRE(tailfile->setProperty(minifi::processors::TailFile::FileName.name, temp_file_path.string())); |
| REQUIRE(tailfile->setProperty(minifi::processors::TailFile::Delimiter.name, "\n")); |
| REQUIRE(tailfile->setProperty(minifi::processors::TailFile::BatchSize.name, "10")); |
| |
| const auto result = test_controller.trigger(); |
| const auto& file_contents = result.at(minifi::processors::TailFile::Success); |
| REQUIRE(file_contents.size() == 10); |
| } |
| |
| TEST_CASE("Result mode tests") { |
| LogTestController::getInstance().setTrace<minifi::processors::TailFile>(); |
| |
| minifi::test::SingleProcessorTestController test_controller(minifi::test::utils::make_processor<minifi::processors::TailFile>("TailFile")); |
| auto tail_file = test_controller.getProcessor(); |
| |
| auto temp_file_path = test_controller.createTempDirectory() / TMP_FILE; |
| |
| const auto [result_mode, ff_count] = GENERATE( |
| std::make_tuple(minifi::processors::TailResultFormat::FlowFilePerBatch, std::size_t{1}), |
| std::make_tuple(minifi::processors::TailResultFormat::FlowFilePerDelimiter, std::size_t{5})); |
| |
| std::ofstream tmp_file; |
| tmp_file.open(temp_file_path, std::ios::out | std::ios::binary); |
| for (auto i = 0; i < 20; ++i) { |
| tmp_file << NEW_TAIL_DATA; |
| } |
| tmp_file.close(); |
| |
| CHECK(tail_file->setProperty(minifi::processors::TailFile::FileName.name, temp_file_path.string())); |
| CHECK(tail_file->setProperty(minifi::processors::TailFile::Delimiter.name, "\n")); |
| CHECK(tail_file->setProperty(minifi::processors::TailFile::BatchSize.name, "5")); |
| CHECK(tail_file->setProperty(minifi::processors::TailFile::ResultFormat.name, std::string(magic_enum::enum_name(result_mode)))); |
| |
| const auto result = test_controller.trigger(); |
| const auto& file_contents = result.at(minifi::processors::TailFile::Success); |
| CHECK(file_contents.size() == ff_count); |
| } |