MINIFICPP-1313 Do a multifile lookup in onSchedule

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #859
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 426667e..c5d2a1a 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -366,7 +366,9 @@
 
     context->getProperty(LookupFrequency.getName(), lookup_frequency_);
 
-    // in multiple mode, we check for new/removed files in every onTrigger
+    recoverState(context);
+
+    doMultifileLookup();
 
   } else {
     tail_mode_ = Mode::SINGLE;
@@ -378,13 +380,13 @@
     } else {
       throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to tail must be a fully qualified file");
     }
+
+    recoverState(context);
   }
 
   std::string rolling_filename_pattern_glob;
   context->getProperty(RollingFilenamePattern.getName(), rolling_filename_pattern_glob);
   rolling_filename_pattern_ = utils::file::PathUtils::globToRegex(rolling_filename_pattern_glob);
-
-  recoverState(context);
 }
 
 void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState> &state) const {
@@ -633,9 +635,7 @@
   if (tail_mode_ == Mode::MULTIPLE) {
     if (last_multifile_lookup_ + lookup_frequency_ < std::chrono::steady_clock::now()) {
       logger_->log_debug("Lookup frequency %" PRId64 " ms have elapsed, doing new multifile lookup", int64_t{lookup_frequency_.count()});
-      checkForRemovedFiles();
-      checkForNewFiles();
-      last_multifile_lookup_ = std::chrono::steady_clock::now();
+      doMultifileLookup();
     } else {
       logger_->log_trace("Skipping multifile lookup");
     }
@@ -745,6 +745,12 @@
   state.checksum_ = checksum;
 }
 
+void TailFile::doMultifileLookup() {
+  checkForRemovedFiles();
+  checkForNewFiles();
+  last_multifile_lookup_ = std::chrono::steady_clock::now();
+}
+
 void TailFile::checkForRemovedFiles() {
   std::vector<std::string> file_names_to_remove;
 
diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h
index 080505c..c673601 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -168,6 +168,8 @@
   bool getStateFromLegacyStateFile(const std::shared_ptr<core::ProcessContext>& context,
                                    std::map<std::string, TailState> &new_tail_states) const;
 
+  void doMultifileLookup();
+
   void checkForRemovedFiles();
 
   void checkForNewFiles();
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 4bd803b..7d0e2b7 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -1499,9 +1499,9 @@
   auto log_attribute = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
   plan->setProperty(log_attribute, processors::LogAttribute::FlowFilesToLog.getName(), "0");
 
-  testController.runSession(plan, true);
-
   SECTION("Lookup frequency not set => defaults to 10 minutes") {
+    testController.runSession(plan, true);
+
     std::shared_ptr<processors::TailFile> tail_file_processor = std::dynamic_pointer_cast<processors::TailFile>(tail_file);
     REQUIRE(tail_file_processor);
     REQUIRE(tail_file_processor->getLookupFrequency() == std::chrono::minutes{10});
@@ -1509,8 +1509,10 @@
 
   SECTION("Lookup frequency set to zero => new files are picked up immediately") {
     plan->setProperty(tail_file, processors::TailFile::LookupFrequency.getName(), "0 sec");
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
 
-    plan->reset(true);
+    plan->reset();
     LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
     createTempFile(directory, "test.blue.log", "sky\n");
@@ -1523,8 +1525,10 @@
 
   SECTION("Lookup frequency set to 100 ms => new files are only picked up after 100 ms") {
     plan->setProperty(tail_file, processors::TailFile::LookupFrequency.getName(), "100 ms");
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
 
-    plan->reset(true);
+    plan->reset();
     LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
     createTempFile(directory, "test.blue.log", "sky\n");
@@ -1533,11 +1537,17 @@
     testController.runSession(plan, true);
     REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files"));
 
-    plan->reset(false);
+    plan->reset();
     LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
     std::this_thread::sleep_for(std::chrono::milliseconds(110));
     testController.runSession(plan, true);
     REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
   }
+
+  SECTION("Lookup frequency set to a thousand years => files already present when started are still picked up") {
+    plan->setProperty(tail_file, processors::TailFile::LookupFrequency.getName(), "365000 days");
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
+  }
 }