MINIFICPP-30: Add support for regex with Multiple file mode

MINIFICPP-30: Update docs to make clear the impact of rollover on Multi file mode

MINIFICPP-30: Fix test issue

Approved by aboda on github

This closes #580.

Signed-off-by: Marc Parisi <phrocker@apache.org>
diff --git a/extensions/standard-processors/CMakeLists.txt b/extensions/standard-processors/CMakeLists.txt
index 7691c78..ddaf19f 100644
--- a/extensions/standard-processors/CMakeLists.txt
+++ b/extensions/standard-processors/CMakeLists.txt
@@ -31,7 +31,7 @@
   target_link_libraries(minifi-standard-processors "${CMAKE_THREAD_LIBS_INIT}")
 endif()
 
-
+target_link_libraries(minifi-standard-processors core-minifi)
 
 SET (STANDARD-PROCESSORS minifi-standard-processors PARENT_SCOPE)
 register_extension(minifi-standard-processors)
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 8302aeb..c282df2 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -37,8 +37,14 @@
 #include <string>
 #include <iostream>
 #include "utils/file/FileUtils.h"
+#include "utils/file/PathUtils.h"
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
+#ifdef HAVE_REGEX_CPP
+#include <regex>
+#else
+#include <regex.h>
+#endif
 #include "TailFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
@@ -61,21 +67,36 @@
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed", "");
+core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed when using single file mode, or a file regex when using multifile mode", "");
 core::Property TailFile::StateFile("State File", "Specifies the file that should be used for storing state about"
                                    " what data has been ingested so that upon restart NiFi can resume from where it left off",
                                    "TailFileState");
 core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed"
                                    "from the incoming file.",
                                    "");
+
+core::Property TailFile::TailMode(
+    core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")->withDescription(
+        "Specifies the tail file mode. In 'Single file' mode only a single file will be watched. "
+        "In 'Multiple file' mode a regex may be used. Note that in multiple file mode we will still continue to watch for rollover on the initial set of watched files. "
+        "The Regex used to locate multiple files will be run during the schedule phrase. Note that if rotated files are matched by the regex, those files will be tailed.")->isRequired(true)
+        ->withAllowableValue<std::string>("Single file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")->build());
+
+core::Property TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory", "Base Directory")->isRequired(false)->build());
+
 core::Relationship TailFile::Success("success", "All files are routed to success");
 
+const char *TailFile::CURRENT_STR = "CURRENT.";
+const char *TailFile::POSITION_STR = "POSITION.";
+
 void TailFile::initialize() {
   // Set the supported properties
   std::set<core::Property> properties;
   properties.insert(FileName);
   properties.insert(StateFile);
   properties.insert(Delimiter);
+  properties.insert(TailMode);
+  properties.insert(BaseDirectory);
   setSupportedProperties(properties);
   // Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -83,12 +104,65 @@
   setSupportedRelationships(relationships);
 }
 
-void TailFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
+
   std::string value;
 
   if (context->getProperty(Delimiter.getName(), value)) {
     delimiter_ = value;
   }
+
+  std::string mode;
+  context->getProperty(TailMode.getName(), mode);
+
+  std::string file = "";
+  if (!context->getProperty(FileName.getName(), file)) {
+    throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to Tail is a required property");
+  }
+  if (mode == "Multiple file") {
+    // file is a regex
+    std::string base_dir;
+    if (!context->getProperty(BaseDirectory.getName(), base_dir)) {
+      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base directory is required for multiple tail mode.");
+    }
+
+    auto fileRegexSelect = [&](const std::string& path, const std::string& filename) -> bool {
+      if (acceptFile(file, filename)) {
+        tail_states_.insert(std::make_pair(filename, TailState {path, filename, 0, 0}));
+      }
+      return true;
+    };
+
+    utils::file::FileUtils::list_dir(base_dir, fileRegexSelect, logger_, false);
+
+  } else {
+    std::string fileLocation, fileName;
+    if (utils::file::PathUtils::getFileNameAndPath(file, fileLocation, fileName)) {
+      tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, fileName, 0, 0 }));
+    } else {
+      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to tail must be a fully qualified file");
+    }
+  }
+}
+
+bool TailFile::acceptFile(const std::string &fileFilter, const std::string &file) {
+#ifndef HAVE_REGEX_CPP
+  regex_t regex;
+  int ret = regcomp(&regex, fileFilter.c_str(), 0);
+  if (ret)
+  return false;
+  ret = regexec(&regex, file.c_str(), (size_t) 0, NULL, 0);
+  regfree(&regex);
+  if (ret)
+  return false;
+#else
+  std::regex regex(fileFilter);
+  if (!std::regex_match(file, regex)) {
+    return false;
+  }
+  return true;
+#endif
 }
 
 std::string TailFile::trimLeft(const std::string& s) {
@@ -131,51 +205,82 @@
   key = trimRight(key);
   value = trimRight(value);
 
-  if (key == "FILENAME")
-    this->_currentTailFileName = value;
-  if (key == "POSITION")
-    this->_currentTailFilePosition = std::stoi(value);
+  if (key == "FILENAME") {
+    std::string fileLocation, fileName;
+    if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
+      tail_states_.insert(std::make_pair(value, TailState { fileLocation, fileName, 0, 0 }));
+    } else {
+      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file contains an invalid file name");
+    }
+  }
+  if (key == "POSITION") {
+    // for backwards compatibility
+    if (tail_states_.size() != 1) {
+      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Incompatible state file types");
+    }
+    tail_states_.begin()->second.currentTailFilePosition_ = std::stoi(value);
+  }
+  if (key.find(CURRENT_STR) == 0) {
+    const auto file = key.substr(strlen(CURRENT_STR) + 1);
+    std::string fileLocation, fileName;
+    if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
+      tail_states_[file].path_ = fileLocation;
+      tail_states_[file].current_file_name_ = fileName;
+    } else {
+      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file contains an invalid file name");
+    }
+  }
+
+  if (key.find("POSITION.") == 0) {
+    const auto file = key.substr(strlen(POSITION_STR) + 1);
+    tail_states_[file].currentTailFilePosition_ = std::stoi(value);
+  }
 
   return;
 }
 
-void TailFile::recoverState() {
-  std::ifstream file(_stateFile.c_str(), std::ifstream::in);
+bool TailFile::recoverState() {
+  std::ifstream file(state_file_.c_str(), std::ifstream::in);
   if (!file.good()) {
-    logger_->log_error("load state file failed %s", _stateFile);
-    return;
+    logger_->log_error("load state file failed %s", state_file_);
+    return false;
   }
+  tail_states_.clear();
   char buf[BUFFER_SIZE];
   for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
     parseStateFileLine(buf);
   }
+  return true;
 }
 
 void TailFile::storeState() {
-  std::ofstream file(_stateFile.c_str());
+  std::ofstream file(state_file_.c_str());
   if (!file.is_open()) {
-    logger_->log_error("store state file failed %s", _stateFile);
+    logger_->log_error("store state file failed %s", state_file_);
     return;
   }
-  file << "FILENAME=" << this->_currentTailFileName << "\n";
-  file << "POSITION=" << this->_currentTailFilePosition << "\n";
+  for (const auto &state : tail_states_) {
+    file << "FILENAME=" << state.first << "\n";
+    file << CURRENT_STR << state.first << "=" << state.second.path_ << utils::file::FileUtils::get_separator() << state.second.current_file_name_ << "\n";
+    file << POSITION_STR << state.first << "=" << state.second.currentTailFilePosition_ << "\n";
+  }
   file.close();
 }
 
 static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) {
   return (i.modifiedTime < j.modifiedTime);
 }
-void TailFile::checkRollOver(const std::string &fileLocation, const std::string &baseFileName) {
+void TailFile::checkRollOver(TailState &file, const std::string &base_file_name) {
   struct stat statbuf;
   std::vector<TailMatchedFileItem> matchedFiles;
-  std::string fullPath = fileLocation + "/" + _currentTailFileName;
+  std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() + file.current_file_name_;
 
   if (stat(fullPath.c_str(), &statbuf) == 0) {
     logger_->log_trace("Searching for files rolled over");
-    std::string pattern = baseFileName;
-    std::size_t found = baseFileName.find_last_of(".");
+    std::string pattern = file.current_file_name_;
+    std::size_t found = file.current_file_name_.find_last_of(".");
     if (found != std::string::npos)
-      pattern = baseFileName.substr(0, found);
+      pattern = file.current_file_name_.substr(0, found);
 
     // Callback, called for each file entry in the listed directory
     // Return value is used to break (false) or continue (true) listing
@@ -184,21 +289,20 @@
       std::string fileFullName = path + utils::file::FileUtils::get_separator() + filename;
       if ((fileFullName.find(pattern) != std::string::npos) && stat(fileFullName.c_str(), &sb) == 0) {
         uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
-        if (candidateModTime >= _currentTailFileModificationTime) {
-          if (filename == _currentTailFileName && candidateModTime == _currentTailFileModificationTime &&
-          sb.st_size == _currentTailFilePosition) {
+        if (candidateModTime >= file.currentTailFileModificationTime_) {
+          if (filename == file.current_file_name_ && candidateModTime == file.currentTailFileModificationTime_ &&
+              sb.st_size == file.currentTailFilePosition_) {
             return true;  // Skip the current file as a candidate in case it wasn't updated
-          }
-          TailMatchedFileItem item;
-          item.fileName = filename;
-          item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
-          matchedFiles.push_back(item);
-        }
       }
-      return true;
-    };
+      TailMatchedFileItem item;
+      item.fileName = filename;
+      item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
+      matchedFiles.push_back(item);
+    }
+  }
+  return true;};
 
-    utils::file::FileUtils::list_dir(fileLocation, lambda, logger_, false);
+    utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false);
 
     if (matchedFiles.size() < 1) {
       logger_->log_debug("No newer files found in directory!");
@@ -208,112 +312,113 @@
     // Sort the list based on modified time
     std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem);
     TailMatchedFileItem item = matchedFiles[0];
-    logger_->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName, item.fileName);
+    logger_->log_info("TailFile File Roll Over from %s to %s", file.current_file_name_, item.fileName);
 
     // Going ahead in the file rolled over
-    if (_currentTailFileName != baseFileName) {
-      _currentTailFilePosition = 0;
+    if (file.current_file_name_ != base_file_name) {
+      file.currentTailFilePosition_ = 0;
     }
-    _currentTailFileName = item.fileName;
+
+    file.current_file_name_ = item.fileName;
+
     storeState();
   } else {
     return;
   }
 }
 
-void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
-  std::string value;
-  std::string fileLocation = "";
-  std::string fileName = "";
-  if (context->getProperty(FileName.getName(), value)) {
-    std::size_t found = value.find_last_of(utils::file::FileUtils::get_separator());
-    fileLocation = value.substr(0, found);
-    fileName = value.substr(found + 1);
+  std::string st_file;
+  if (context->getProperty(StateFile.getName(), st_file)) {
+    state_file_ = st_file + "." + getUUIDStr();
   }
-  if (context->getProperty(StateFile.getName(), value)) {
-    _stateFile = value + "." + getUUIDStr();
-  }
-  if (!this->_stateRecovered) {
-    _stateRecovered = true;
-    this->_currentTailFileName = fileName;
-    this->_currentTailFilePosition = 0;
+  if (!this->state_recovered_) {
+    state_recovered_ = true;
     // recover the state if we have not done so
     this->recoverState();
   }
-  checkRollOver(fileLocation, fileName);
-  std::string fullPath = fileLocation + "/" + _currentTailFileName;
-  struct stat statbuf;
 
-  logger_->log_debug("Tailing file %s", fullPath);
-  if (stat(fullPath.c_str(), &statbuf) == 0) {
-    if ((uint64_t) statbuf.st_size <= this->_currentTailFilePosition) {
-      logger_->log_trace("Current pos: %llu", this->_currentTailFilePosition);
-      logger_->log_trace("%s", "there are no new input for the current tail file");
-      context->yield();
-      return;
-    }
-    std::size_t found = _currentTailFileName.find_last_of(".");
-    std::string baseName = _currentTailFileName.substr(0, found);
-    std::string extension = _currentTailFileName.substr(found + 1);
+  /**
+   * iterate over file states. may modify them
+   */
+  for (auto &state : tail_states_) {
+    auto fileLocation = state.second.path_;
 
-    if (!delimiter_.empty()) {
-      char delim = delimiter_.c_str()[0];
-      if (delim == '\\') {
-        if (delimiter_.size() > 1) {
-          switch (delimiter_.c_str()[1]) {
-            case 'r':
-              delim = '\r';
-              break;
-            case 't':
-              delim = '\t';
-              break;
-            case 'n':
-              delim = '\n';
-              break;
-            case '\\':
-              delim = '\\';
-              break;
-            default:
-              // previous behavior
-              break;
+    checkRollOver(state.second, state.first);
+    std::string fullPath = fileLocation + utils::file::FileUtils::get_separator() + state.second.current_file_name_;
+    struct stat statbuf;
+
+    logger_->log_debug("Tailing file %s", fullPath);
+    if (stat(fullPath.c_str(), &statbuf) == 0) {
+      if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) {
+        logger_->log_trace("Current pos: %llu", state.second.currentTailFilePosition_);
+        logger_->log_trace("%s", "there are no new input for the current tail file");
+        context->yield();
+        return;
+      }
+      std::size_t found = state.first.find_last_of(".");
+      std::string baseName = state.first.substr(0, found);
+      std::string extension = state.first.substr(found + 1);
+
+      if (!delimiter_.empty()) {
+        char delim = delimiter_.c_str()[0];
+        if (delim == '\\') {
+          if (delimiter_.size() > 1) {
+            switch (delimiter_.c_str()[1]) {
+              case 'r':
+                delim = '\r';
+                break;
+              case 't':
+                delim = '\t';
+                break;
+              case 'n':
+                delim = '\n';
+                break;
+              case '\\':
+                delim = '\\';
+                break;
+              default:
+                // previous behavior
+                break;
+            }
           }
         }
-      }
-      logger_->log_debug("Looking for delimiter 0x%X", delim);
-      std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
-      session->import(fullPath, flowFiles, true, this->_currentTailFilePosition, delim);
-      logger_->log_info("%u flowfiles were received from TailFile input", flowFiles.size());
+        logger_->log_debug("Looking for delimiter 0x%X", delim);
+        std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
+        session->import(fullPath, flowFiles, true, state.second.currentTailFilePosition_, delim);
+        logger_->log_info("%u flowfiles were received from TailFile input", flowFiles.size());
 
-      for (auto ffr : flowFiles) {
-        logger_->log_info("TailFile %s for %u bytes", _currentTailFileName, ffr->getSize());
-        std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension;
-        ffr->updateKeyedAttribute(PATH, fileLocation);
-        ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-        ffr->updateKeyedAttribute(FILENAME, logName);
-        session->transfer(ffr, Success);
-        this->_currentTailFilePosition += ffr->getSize() + 1;
-        storeState();
-      }
+        for (auto ffr : flowFiles) {
+          logger_->log_info("TailFile %s for %u bytes", state.first, ffr->getSize());
+          std::string logName = baseName + "." + std::to_string(state.second.currentTailFilePosition_) + "-" + std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + extension;
+          ffr->updateKeyedAttribute(PATH, fileLocation);
+          ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
+          ffr->updateKeyedAttribute(FILENAME, logName);
+          session->transfer(ffr, Success);
+          state.second.currentTailFilePosition_ += ffr->getSize() + 1;
+          storeState();
+        }
 
+      } else {
+        std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+        if (flowFile) {
+          flowFile->updateKeyedAttribute(PATH, fileLocation);
+          flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
+          session->import(fullPath, flowFile, true, state.second.currentTailFilePosition_);
+          session->transfer(flowFile, Success);
+          logger_->log_info("TailFile %s for %llu bytes", state.first, flowFile->getSize());
+          std::string logName = baseName + "." + std::to_string(state.second.currentTailFilePosition_) + "-" + std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + "."
+              + extension;
+          flowFile->updateKeyedAttribute(FILENAME, logName);
+          state.second.currentTailFilePosition_ += flowFile->getSize();
+          storeState();
+        }
+      }
+      state.second.currentTailFileModificationTime_ = ((uint64_t) (statbuf.st_mtime) * 1000);
     } else {
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
-      if (flowFile) {
-        flowFile->updateKeyedAttribute(PATH, fileLocation);
-        flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-        session->import(fullPath, flowFile, true, this->_currentTailFilePosition);
-        session->transfer(flowFile, Success);
-        logger_->log_info("TailFile %s for %llu bytes", _currentTailFileName, flowFile->getSize());
-        std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" +
-                              std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension;
-        flowFile->updateKeyedAttribute(FILENAME, logName);
-        this->_currentTailFilePosition += flowFile->getSize();
-        storeState();
-      }
+      logger_->log_warn("Unable to stat file %s", fullPath);
     }
-    _currentTailFileModificationTime = ((uint64_t) (statbuf.st_mtime) * 1000);
-  } else {
-    logger_->log_warn("Unable to stat file %s", fullPath);
   }
 }
 
diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h
index 54296a2..470689a 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -23,10 +23,10 @@
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
+
 #include "core/Core.h"
 #include "core/Resource.h"
 #include "core/logging/LoggerConfiguration.h"
-
 namespace org {
 namespace apache {
 namespace nifi {
@@ -34,6 +34,23 @@
 namespace processors {
 
 // TailFile Class
+
+
+typedef struct {
+  std::string path_;
+  std::string current_file_name_;
+  uint64_t currentTailFilePosition_;
+  uint64_t currentTailFileModificationTime_;
+} TailState;
+
+// Matched File Item for Roll over check
+typedef struct {
+  std::string fileName;
+  uint64_t modifiedTime;
+} TailMatchedFileItem;
+
+
+
 class TailFile : public core::Processor {
  public:
   // Constructor
@@ -42,10 +59,8 @@
    */
   explicit TailFile(std::string name, utils::Identifier uuid = utils::Identifier())
       : core::Processor(name, uuid),
-        _currentTailFilePosition(0),
-        _currentTailFileModificationTime(0),
         logger_(logging::LoggerFactory<TailFile>::getLogger()) {
-    _stateRecovered = false;
+    state_recovered_ = false;
   }
   // Destructor
   virtual ~TailFile() {
@@ -57,40 +72,44 @@
   static core::Property FileName;
   static core::Property StateFile;
   static core::Property Delimiter;
+  static core::Property TailMode;
+  static core::Property BaseDirectory;
   // Supported Relationships
   static core::Relationship Success;
 
  public:
+
+  bool acceptFile(const std::string &filter, const std::string &file);
   /**
    * Function that's executed when the processor is scheduled.
    * @param context process context.
    * @param sessionFactory process session factory that is used when creating
    * ProcessSession objects.
    */
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
   // OnTrigger method, implemented by NiFi TailFile
-  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession>  &session) override;
   // Initialize, over write by NiFi TailFile
-  virtual void initialize(void);
+  void initialize(void) override;
   // recoverState
-  void recoverState();
+  bool recoverState();
   // storeState
   void storeState();
 
- protected:
-
  private:
+
+  static const char *CURRENT_STR;
+  static const char *POSITION_STR;
   std::mutex tail_file_mutex_;
   // File to save state
-  std::string _stateFile;
-  // State related to the tailed file
-  std::string _currentTailFileName;
+  std::string state_file_;
   // Delimiter for the data incoming from the tailed file.
   std::string delimiter_;
   // determine if state is recovered;
-  bool _stateRecovered;
-  uint64_t _currentTailFilePosition;
-  uint64_t _currentTailFileModificationTime;
+  bool state_recovered_;
+
+  std::map<std::string, TailState> tail_states_;
+
   static const int BUFFER_SIZE = 512;
 
   // Utils functions for parse state file
@@ -100,7 +119,7 @@
   /**
    * Check roll over for the provided file.
    */
-  void checkRollOver(const std::string &, const std::string&);
+  void checkRollOver(TailState &file, const std::string &base_file_name);
   std::shared_ptr<logging::Logger> logger_;
 };
 
@@ -111,12 +130,6 @@
                   " rather than running with the default value of 0 secs, as this Processor will consume a lot of resources if scheduled very aggressively. At this time, this Processor"
                   " does not support ingesting files that have been compressed when 'rolled over'.");
 
-// Matched File Item for Roll over check
-typedef struct {
-  std::string fileName;
-  uint64_t modifiedTime;
-} TailMatchedFileItem;
-
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index b7c96ec..e8c3269 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -29,6 +29,7 @@
 #include "TestBase.h"
 #include "core/Core.h"
 #include "core/FlowFile.h"
+#include "utils/file/FileUtils.h"
 #include "unit/ProvenanceTestHelper.h"
 #include "core/Processor.h"
 #include "core/ProcessContext.h"
@@ -37,19 +38,18 @@
 #include "TailFile.h"
 #include "LogAttribute.h"
 
-
-static std::string NEWLINE_FILE = "" // NOLINT
-    "one,two,three\n"
-    "four,five,six, seven";
+static std::string NEWLINE_FILE = ""  // NOLINT
+        "one,two,three\n"
+        "four,five,six, seven";
 static const char *TMP_FILE = "/tmp/minifi-tmpfile.txt";
 static const char *STATE_FILE = "/tmp/minifi-state-file.txt";
 
 TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
   // Create and write to the test file
-      std::ofstream tmpfile;
-      tmpfile.open(TMP_FILE);
-      tmpfile << NEWLINE_FILE;
-      tmpfile.close();
+  std::ofstream tmpfile;
+  tmpfile.open(TMP_FILE);
+  tmpfile << NEWLINE_FILE;
+  tmpfile.close();
 
   TestController testController;
   LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
@@ -83,10 +83,6 @@
 
 TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
   // Create and write to the test file
-      std::ofstream tmpfile;
-      tmpfile.open(TMP_FILE);
-      tmpfile << NEWLINE_FILE;
-      tmpfile.close();
 
   TestController testController;
   LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
@@ -98,11 +94,25 @@
 
   char format[] = "/tmp/gt.XXXXXX";
   char *dir = testController.createTempDirectory(format);
+  std::stringstream temp_file_ss;
+  temp_file_ss << dir << utils::file::FileUtils::get_separator() << "minifi-tmpfile.txt";
+  auto temp_file = temp_file_ss.str();
+  std::ofstream tmpfile;
+  tmpfile.open(temp_file);
+  tmpfile << NEWLINE_FILE;
+  tmpfile.close();
 
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE);
+  SECTION("Single") {
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file);
+}
+
+  SECTION("Multiple") {
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-.*\\.txt");
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+}
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
 
-
   testController.runSession(plan, false);
   auto records = plan->getProvenanceRecords();
   std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
@@ -114,17 +124,40 @@
   LogTestController::getInstance().reset();
 
   // Delete the test and state file.
-  remove(TMP_FILE);
   remove(STATE_FILE);
 }
 
+TEST_CASE("TailWithInvalid", "[tailfiletest2]") {
+  TestController testController;
+  LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+  std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+
+  plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  SECTION("No File and No base") {
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+}
+
+  SECTION("No base") {
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-.*\\.txt");
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+}
+  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
+
+  REQUIRE_THROWS(plan->runNextProcessor());
+}
+
 TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
   TestController testController;
 
   const char DELIM = ',';
   size_t expected_pieces = std::count(NEWLINE_FILE.begin(), NEWLINE_FILE.end(), DELIM);  // The last piece is left as considered unfinished
 
-
   LogTestController::getInstance().setTrace<TestPlan>();
   LogTestController::getInstance().setTrace<processors::TailFile>();
   LogTestController::getInstance().setTrace<processors::LogAttribute>();
@@ -147,29 +180,24 @@
   in_file_stream.flush();
 
   // Build MiNiFi processing graph
-  auto tail_file = plan->addProcessor(
-      "TailFile",
-      "Tail");
-  plan->setProperty(
-      tail_file,
-      processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
+  auto tail_file = plan->addProcessor("TailFile", "Tail");
+  plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
+  SECTION("single") {
   plan->setProperty(
       tail_file,
       processors::TailFile::FileName.getName(), in_file);
-  plan->setProperty(
-      tail_file,
-      processors::TailFile::StateFile.getName(), state_file);
-  auto log_attr = plan->addProcessor(
-      "LogAttribute",
-      "Log",
-      core::Relationship("success", "description"),
-      true);
-  plan->setProperty(
-      log_attr,
-      processors::LogAttribute::FlowFilesToLog.getName(), "0");
+}
+  SECTION("Multiple") {
+  plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "test.*");
+  plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+  plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+}
+  plan->setProperty(tail_file, processors::TailFile::StateFile.getName(), state_file);
+  auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
+  plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
+  plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true");
   // Log as many FFs as it can to make sure exactly the expected amount is produced
 
-
   plan->runNextProcessor();  // Tail
   plan->runNextProcessor();  // Log
 
@@ -178,10 +206,9 @@
   in_file_stream << DELIM;
   in_file_stream.close();
 
-
   std::string rotated_file = (in_file + ".1");
 
-  REQUIRE(rename(in_file.c_str(), rotated_file.c_str() ) == 0);
+  REQUIRE(rename(in_file.c_str(), rotated_file.c_str()) == 0);
 
   std::this_thread::sleep_for(std::chrono::milliseconds(1000));  // make sure the new file gets newer modification time
 
@@ -228,8 +255,7 @@
 
   for (int i = 2; 0 <= i; --i) {
     if (i < 2) {
-      std::this_thread::sleep_for(
-          std::chrono::milliseconds(1000));  // make sure the new file gets newer modification time
+      std::this_thread::sleep_for(std::chrono::milliseconds(1000));  // make sure the new file gets newer modification time
     }
     std::ofstream in_file_stream(in_file + (i > 0 ? std::to_string(i) : ""));
     for (int j = 0; j <= i; j++) {
@@ -239,29 +265,14 @@
   }
 
   // Build MiNiFi processing graph
-  auto tail_file = plan->addProcessor(
-      "TailFile",
-      "Tail");
-  plan->setProperty(
-      tail_file,
-      processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
-  plan->setProperty(
-      tail_file,
-      processors::TailFile::FileName.getName(), in_file);
-  plan->setProperty(
-      tail_file,
-      processors::TailFile::StateFile.getName(), state_file);
-  auto log_attr = plan->addProcessor(
-      "LogAttribute",
-      "Log",
-      core::Relationship("success", "description"),
-      true);
-  plan->setProperty(
-      log_attr,
-      processors::LogAttribute::FlowFilesToLog.getName(), "0");
+  auto tail_file = plan->addProcessor("TailFile", "Tail");
+  plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
+  plan->setProperty(tail_file, processors::TailFile::FileName.getName(), in_file);
+  plan->setProperty(tail_file, processors::TailFile::StateFile.getName(), state_file);
+  auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
+  plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
   // Log as many FFs as it can to make sure exactly the expected amount is produced
 
-
   // Each iteration should go through one file and log all flowfiles
   for (int i = 2; 0 <= i; --i) {
     plan->reset();
@@ -282,142 +293,141 @@
   REQUIRE(LogTestController::getInstance().contains(std::string("Logged 2 flow files")));
 }
 
-
 /*
-TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
-  try {
-    // Create and write to the test file
-    std::ofstream tmpfile;
-    tmpfile.open(TMP_FILE);
-    tmpfile << NEWLINE_FILE;
-    tmpfile.close();
+ TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
+ try {
+ // Create and write to the test file
+ std::ofstream tmpfile;
+ tmpfile.open(TMP_FILE);
+ tmpfile << NEWLINE_FILE;
+ tmpfile.close();
 
-    TestController testController;
-    LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::TailFile>();
-    LogTestController::getInstance().setDebug<core::ProcessSession>();
-    LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>();
+ TestController testController;
+ LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::TailFile>();
+ LogTestController::getInstance().setDebug<core::ProcessSession>();
+ LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>();
 
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
 
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+ std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
+ std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
 
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+ utils::Identifier processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+ utils::Identifier logAttributeuuid;
+ REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
 
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("success", "TailFile successful output"));
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+ connection->addRelationship(core::Relationship("success", "TailFile successful output"));
 
-    // link the connections so that we can test results at the end for this
-    connection->setDestination(connection);
+ // link the connections so that we can test results at the end for this
+ connection->setDestination(connection);
 
-    connection->setSourceUUID(processoruuid);
+ connection->setSourceUUID(processoruuid);
 
-    processor->addConnection(connection);
+ processor->addConnection(connection);
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
 
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
-    context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n");
-    context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
-    context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n");
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
 
-    core::ProcessSession session(&context);
+ core::ProcessSession session(&context);
 
-    REQUIRE(processor->getName() == "tailfile");
+ REQUIRE(processor->getName() == "tailfile");
 
-    core::ProcessSessionFactory factory(&context);
+ core::ProcessSessionFactory factory(&context);
 
-    std::shared_ptr<core::FlowFile> record;
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    processor->onSchedule(&context, &factory);
-    processor->onTrigger(&context, &session);
+ std::shared_ptr<core::FlowFile> record;
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ processor->onSchedule(&context, &factory);
+ processor->onTrigger(&context, &session);
 
-    provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
-    std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
-    record = session.get();
-    REQUIRE(record == nullptr);
-    std::shared_ptr<core::FlowFile> ff = session.get();
-    REQUIRE(provRecords.size() == 4);   // 2 creates and 2 modifies for flowfiles
+ provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+ std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
+ record = session.get();
+ REQUIRE(record == nullptr);
+ std::shared_ptr<core::FlowFile> ff = session.get();
+ REQUIRE(provRecords.size() == 4);   // 2 creates and 2 modifies for flowfiles
 
-    LogTestController::getInstance().reset();
-  } catch (...) {
-  }
+ LogTestController::getInstance().reset();
+ } catch (...) {
+ }
 
-  // Delete the test and state file.
-  std::remove(TMP_FILE);
-  std::remove(STATE_FILE);
-}
+ // Delete the test and state file.
+ std::remove(TMP_FILE);
+ std::remove(STATE_FILE);
+ }
 
 
-TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") {
-  try {
-    // Create and write to the test file
-    std::ofstream tmpfile;
-    tmpfile.open(TMP_FILE);
-    tmpfile << NEWLINE_FILE;
-    tmpfile.close();
+ TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") {
+ try {
+ // Create and write to the test file
+ std::ofstream tmpfile;
+ tmpfile.open(TMP_FILE);
+ tmpfile << NEWLINE_FILE;
+ tmpfile.close();
 
-    TestController testController;
-    LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
+ TestController testController;
+ LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
 
-    std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
 
-    std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+ std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
+ std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
 
-    utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
-    utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+ utils::Identifier processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+ utils::Identifier logAttributeuuid;
+ REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
 
-    std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
-    std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    connection->addRelationship(core::Relationship("success", "TailFile successful output"));
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+ connection->addRelationship(core::Relationship("success", "TailFile successful output"));
 
-    // link the connections so that we can test results at the end for this
-    connection->setDestination(connection);
-    connection->setSourceUUID(processoruuid);
+ // link the connections so that we can test results at the end for this
+ connection->setDestination(connection);
+ connection->setSourceUUID(processoruuid);
 
-    processor->addConnection(connection);
+ processor->addConnection(connection);
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
 
-    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-    core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
-    context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
-    context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
 
-    core::ProcessSession session(&context);
+ core::ProcessSession session(&context);
 
-    REQUIRE(processor->getName() == "tailfile");
+ REQUIRE(processor->getName() == "tailfile");
 
-    core::ProcessSessionFactory factory(&context);
+ core::ProcessSessionFactory factory(&context);
 
-    std::shared_ptr<core::FlowFile> record;
-    processor->setScheduledState(core::ScheduledState::RUNNING);
-    processor->onSchedule(&context, &factory);
-    processor->onTrigger(&context, &session);
+ std::shared_ptr<core::FlowFile> record;
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ processor->onSchedule(&context, &factory);
+ processor->onTrigger(&context, &session);
 
-    provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
-    std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
-    record = session.get();
-    REQUIRE(record == nullptr);
-    std::shared_ptr<core::FlowFile> ff = session.get();
-    REQUIRE(provRecords.size() == 2);
+ provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+ std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
+ record = session.get();
+ REQUIRE(record == nullptr);
+ std::shared_ptr<core::FlowFile> ff = session.get();
+ REQUIRE(provRecords.size() == 2);
 
-    LogTestController::getInstance().reset();
-  } catch (...) {
-  }
+ LogTestController::getInstance().reset();
+ } catch (...) {
+ }
 
-  // Delete the test and state file.
-  std::remove(TMP_FILE);
-  std::remove(STATE_FILE);
-}
-*/
+ // Delete the test and state file.
+ std::remove(TMP_FILE);
+ std::remove(STATE_FILE);
+ }
+ */
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index c3ae370..dd91442 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -97,7 +97,8 @@
 if (OPENSSL_FOUND)
 	set(TLS_SOURCES "src/io/tls/*.cpp")
 endif(OPENSSL_FOUND)
-file(GLOB SOURCES  "src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
+
+file(GLOB SOURCES  "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
 
 file(GLOB PROCESSOR_SOURCES  "src/processors/*.cpp" )
 
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 2158b47..daf3a20 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -36,7 +36,11 @@
   MILLISECOND,
   NANOSECOND
 };
-
+#if defined(WIN32) || (__cplusplus >= 201103L && (!defined(__GLIBCXX__) || (__cplusplus >= 201402L) ||  (defined(_GLIBCXX_RELEASE) && _GLIBCXX_RELEASE > 4)))
+#define HAVE_REGEX_CPP 1
+#else
+#define HAVE_REGEX_CPP 0
+#endif
 namespace org {
 namespace apache {
 namespace nifi {
diff --git a/libminifi/include/utils/file/PathUtils.h b/libminifi/include/utils/file/PathUtils.h
new file mode 100644
index 0000000..12925ce
--- /dev/null
+++ b/libminifi/include/utils/file/PathUtils.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_UTILS_PATHUTILS_H_
+#define LIBMINIFI_INCLUDE_UTILS_PATHUTILS_H_
+
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace file {
+namespace PathUtils {
+
+/**
+ * Extracts the filename and path performing some validation of the path and output to ensure
+ * we don't provide invalid results.
+ * @param path input path
+ * @param filePath output file path
+ * @param fileName output file name
+ * @return result of the operation.
+ */
+extern bool getFileNameAndPath(const std::string &path, std::string &filePath, std::string &fileName);
+
+} /* namespace PathUtils */
+} /* namespace file */
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_UTILS_PATHUTILS_H_ */
diff --git a/libminifi/src/utils/file/PathUtils.cpp b/libminifi/src/utils/file/PathUtils.cpp
new file mode 100644
index 0000000..4982369
--- /dev/null
+++ b/libminifi/src/utils/file/PathUtils.cpp
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/file/PathUtils.h"
+#include "utils/file/FileUtils.h"
+#include <iostream>
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace file {
+
+bool PathUtils::getFileNameAndPath(const std::string &path, std::string &filePath, std::string &fileName) {
+  const std::size_t found = path.find_last_of(FileUtils::get_separator());
+  /**
+   * Don't make an assumption about about the path, return false for this case.
+   * Could make the assumption that the path is just the file name but with the function named
+   * getFileNameAndPath we expect both to be there ( a fully qualified path ).
+   *
+   */
+  if (found == std::string::npos || found == path.length() - 1) {
+    return false;
+  }
+  if (found == 0) {
+    filePath = "";  // don't assume that path is not empty
+    filePath += FileUtils::get_separator();
+    fileName = path.substr(found + 1);
+    return true;
+  }
+  filePath = path.substr(0, found);
+  fileName = path.substr(found + 1);
+  return true;
+}
+
+} /* namespace file */
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/libminifi/test/unit/FileUtilsTests.cpp b/libminifi/test/unit/FileUtilsTests.cpp
index 85c783b..0abd3f6 100644
--- a/libminifi/test/unit/FileUtilsTests.cpp
+++ b/libminifi/test/unit/FileUtilsTests.cpp
@@ -23,6 +23,7 @@
 #include "../TestBase.h"
 #include "core/Core.h"
 #include "utils/file/FileUtils.h"
+#include "utils/file/PathUtils.h"
 
 using org::apache::nifi::minifi::utils::file::FileUtils;
 
@@ -81,6 +82,44 @@
 #endif
 }
 
+TEST_CASE("TestFilePath", "[TestGetFileNameAndPath]") {
+  SECTION("VALID FILE AND PATH") {
+  std::stringstream path;
+  path << "a" << FileUtils::get_separator() << "b" << FileUtils::get_separator() << "c";
+  std::stringstream file;
+  file << path.str() << FileUtils::get_separator() << "file";
+  std::string filename, filepath;
+  REQUIRE(true == utils::file::PathUtils::getFileNameAndPath(file.str(), filepath, filename) );
+  REQUIRE(path.str() == filepath);
+  REQUIRE("file" == filename);
+}
+SECTION("NO FILE VALID PATH") {
+  std::stringstream path;
+  path << "a" << FileUtils::get_separator() << "b" << FileUtils::get_separator() << "c" << FileUtils::get_separator();
+  std::string filename, filepath;
+  REQUIRE(false == utils::file::PathUtils::getFileNameAndPath(path.str(), filepath, filename) );
+  REQUIRE(filepath.empty());
+  REQUIRE(filename.empty());
+}
+SECTION("FILE NO PATH") {
+  std::string path = "/file";
+  std::string filename, filepath;
+  std::string expectedPath;
+  expectedPath += FileUtils::get_separator();
+  REQUIRE(true == utils::file::PathUtils::getFileNameAndPath(path, filepath, filename) );
+  REQUIRE(expectedPath == filepath);
+  REQUIRE("file" == filename);
+}
+SECTION("NO FILE NO PATH") {
+  std::string path = "file";
+  std::string filename, filepath;
+  std::string expectedPath = "" + FileUtils::get_separator();
+  REQUIRE(false == utils::file::PathUtils::getFileNameAndPath(path, filepath, filename) );
+  REQUIRE(filepath.empty());
+  REQUIRE(filename.empty());
+}
+}
+
 TEST_CASE("TestFileUtils::get_executable_path", "[TestGetExecutablePath]") {
   std::string executable_path = FileUtils::get_executable_path();
   std::cerr << "Executable path: " << executable_path << std::endl;