MINIFICPP-1313 Do a multifile lookup in onSchedule
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #859
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 426667e..c5d2a1a 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -366,7 +366,9 @@
context->getProperty(LookupFrequency.getName(), lookup_frequency_);
- // in multiple mode, we check for new/removed files in every onTrigger
+ recoverState(context);
+
+ doMultifileLookup();
} else {
tail_mode_ = Mode::SINGLE;
@@ -378,13 +380,13 @@
} else {
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to tail must be a fully qualified file");
}
+
+ recoverState(context);
}
std::string rolling_filename_pattern_glob;
context->getProperty(RollingFilenamePattern.getName(), rolling_filename_pattern_glob);
rolling_filename_pattern_ = utils::file::PathUtils::globToRegex(rolling_filename_pattern_glob);
-
- recoverState(context);
}
void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState> &state) const {
@@ -633,9 +635,7 @@
if (tail_mode_ == Mode::MULTIPLE) {
if (last_multifile_lookup_ + lookup_frequency_ < std::chrono::steady_clock::now()) {
logger_->log_debug("Lookup frequency %" PRId64 " ms have elapsed, doing new multifile lookup", int64_t{lookup_frequency_.count()});
- checkForRemovedFiles();
- checkForNewFiles();
- last_multifile_lookup_ = std::chrono::steady_clock::now();
+ doMultifileLookup();
} else {
logger_->log_trace("Skipping multifile lookup");
}
@@ -745,6 +745,12 @@
state.checksum_ = checksum;
}
+void TailFile::doMultifileLookup() {
+ checkForRemovedFiles();
+ checkForNewFiles();
+ last_multifile_lookup_ = std::chrono::steady_clock::now();
+}
+
void TailFile::checkForRemovedFiles() {
std::vector<std::string> file_names_to_remove;
diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h
index 080505c..c673601 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -168,6 +168,8 @@
bool getStateFromLegacyStateFile(const std::shared_ptr<core::ProcessContext>& context,
std::map<std::string, TailState> &new_tail_states) const;
+ void doMultifileLookup();
+
void checkForRemovedFiles();
void checkForNewFiles();
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 4bd803b..7d0e2b7 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -1499,9 +1499,9 @@
auto log_attribute = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
plan->setProperty(log_attribute, processors::LogAttribute::FlowFilesToLog.getName(), "0");
- testController.runSession(plan, true);
-
SECTION("Lookup frequency not set => defaults to 10 minutes") {
+ testController.runSession(plan, true);
+
std::shared_ptr<processors::TailFile> tail_file_processor = std::dynamic_pointer_cast<processors::TailFile>(tail_file);
REQUIRE(tail_file_processor);
REQUIRE(tail_file_processor->getLookupFrequency() == std::chrono::minutes{10});
@@ -1509,8 +1509,10 @@
SECTION("Lookup frequency set to zero => new files are picked up immediately") {
plan->setProperty(tail_file, processors::TailFile::LookupFrequency.getName(), "0 sec");
+ testController.runSession(plan, true);
+ REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
- plan->reset(true);
+ plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
createTempFile(directory, "test.blue.log", "sky\n");
@@ -1523,8 +1525,10 @@
SECTION("Lookup frequency set to 100 ms => new files are only picked up after 100 ms") {
plan->setProperty(tail_file, processors::TailFile::LookupFrequency.getName(), "100 ms");
+ testController.runSession(plan, true);
+ REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
- plan->reset(true);
+ plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
createTempFile(directory, "test.blue.log", "sky\n");
@@ -1533,11 +1537,17 @@
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files"));
- plan->reset(false);
+ plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
std::this_thread::sleep_for(std::chrono::milliseconds(110));
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
}
+
+ SECTION("Lookup frequency set to a thousand years => files already present when started are still picked up") {
+ plan->setProperty(tail_file, processors::TailFile::LookupFrequency.getName(), "365000 days");
+ testController.runSession(plan, true);
+ REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
+ }
}