MINIFICPP-30: Add support for regex with Multiple file mode
MINIFICPP-30: Update docs to make clear the impact of rollover on Multi file mode
MINIFICPP-30: Fix test issue
Approved by aboda on github
This closes #580.
Signed-off-by: Marc Parisi <phrocker@apache.org>
diff --git a/extensions/standard-processors/CMakeLists.txt b/extensions/standard-processors/CMakeLists.txt
index 7691c78..ddaf19f 100644
--- a/extensions/standard-processors/CMakeLists.txt
+++ b/extensions/standard-processors/CMakeLists.txt
@@ -31,7 +31,7 @@
target_link_libraries(minifi-standard-processors "${CMAKE_THREAD_LIBS_INIT}")
endif()
-
+target_link_libraries(minifi-standard-processors core-minifi)
SET (STANDARD-PROCESSORS minifi-standard-processors PARENT_SCOPE)
register_extension(minifi-standard-processors)
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 8302aeb..c282df2 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -37,8 +37,14 @@
#include <string>
#include <iostream>
#include "utils/file/FileUtils.h"
+#include "utils/file/PathUtils.h"
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
+#ifdef HAVE_REGEX_CPP
+#include <regex>
+#else
+#include <regex.h>
+#endif
#include "TailFile.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
@@ -61,21 +67,36 @@
namespace minifi {
namespace processors {
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed", "");
+core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed when using single file mode, or a file regex when using multifile mode", "");
core::Property TailFile::StateFile("State File", "Specifies the file that should be used for storing state about"
" what data has been ingested so that upon restart NiFi can resume from where it left off",
"TailFileState");
core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed"
"from the incoming file.",
"");
+
+core::Property TailFile::TailMode(
+ core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")->withDescription(
+ "Specifies the tail file mode. In 'Single file' mode only a single file will be watched. "
+ "In 'Multiple file' mode a regex may be used. Note that in multiple file mode we will still continue to watch for rollover on the initial set of watched files. "
+ "The Regex used to locate multiple files will be run during the schedule phrase. Note that if rotated files are matched by the regex, those files will be tailed.")->isRequired(true)
+ ->withAllowableValue<std::string>("Single file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")->build());
+
+core::Property TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory", "Base Directory")->isRequired(false)->build());
+
core::Relationship TailFile::Success("success", "All files are routed to success");
+const char *TailFile::CURRENT_STR = "CURRENT.";
+const char *TailFile::POSITION_STR = "POSITION.";
+
void TailFile::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(FileName);
properties.insert(StateFile);
properties.insert(Delimiter);
+ properties.insert(TailMode);
+ properties.insert(BaseDirectory);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
@@ -83,12 +104,65 @@
setSupportedRelationships(relationships);
}
-void TailFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
+void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+ std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
+
std::string value;
if (context->getProperty(Delimiter.getName(), value)) {
delimiter_ = value;
}
+
+ std::string mode;
+ context->getProperty(TailMode.getName(), mode);
+
+ std::string file = "";
+ if (!context->getProperty(FileName.getName(), file)) {
+ throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to Tail is a required property");
+ }
+ if (mode == "Multiple file") {
+ // file is a regex
+ std::string base_dir;
+ if (!context->getProperty(BaseDirectory.getName(), base_dir)) {
+ throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base directory is required for multiple tail mode.");
+ }
+
+ auto fileRegexSelect = [&](const std::string& path, const std::string& filename) -> bool {
+ if (acceptFile(file, filename)) {
+ tail_states_.insert(std::make_pair(filename, TailState {path, filename, 0, 0}));
+ }
+ return true;
+ };
+
+ utils::file::FileUtils::list_dir(base_dir, fileRegexSelect, logger_, false);
+
+ } else {
+ std::string fileLocation, fileName;
+ if (utils::file::PathUtils::getFileNameAndPath(file, fileLocation, fileName)) {
+ tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, fileName, 0, 0 }));
+ } else {
+ throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to tail must be a fully qualified file");
+ }
+ }
+}
+
+bool TailFile::acceptFile(const std::string &fileFilter, const std::string &file) {
+#ifndef HAVE_REGEX_CPP
+ regex_t regex;
+ int ret = regcomp(®ex, fileFilter.c_str(), 0);
+ if (ret)
+ return false;
+ ret = regexec(®ex, file.c_str(), (size_t) 0, NULL, 0);
+ regfree(®ex);
+ if (ret)
+ return false;
+#else
+ std::regex regex(fileFilter);
+ if (!std::regex_match(file, regex)) {
+ return false;
+ }
+ return true;
+#endif
}
std::string TailFile::trimLeft(const std::string& s) {
@@ -131,51 +205,82 @@
key = trimRight(key);
value = trimRight(value);
- if (key == "FILENAME")
- this->_currentTailFileName = value;
- if (key == "POSITION")
- this->_currentTailFilePosition = std::stoi(value);
+ if (key == "FILENAME") {
+ std::string fileLocation, fileName;
+ if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
+ tail_states_.insert(std::make_pair(value, TailState { fileLocation, fileName, 0, 0 }));
+ } else {
+ throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file contains an invalid file name");
+ }
+ }
+ if (key == "POSITION") {
+ // for backwards compatibility
+ if (tail_states_.size() != 1) {
+ throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Incompatible state file types");
+ }
+ tail_states_.begin()->second.currentTailFilePosition_ = std::stoi(value);
+ }
+ if (key.find(CURRENT_STR) == 0) {
+ const auto file = key.substr(strlen(CURRENT_STR) + 1);
+ std::string fileLocation, fileName;
+ if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
+ tail_states_[file].path_ = fileLocation;
+ tail_states_[file].current_file_name_ = fileName;
+ } else {
+ throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file contains an invalid file name");
+ }
+ }
+
+ if (key.find("POSITION.") == 0) {
+ const auto file = key.substr(strlen(POSITION_STR) + 1);
+ tail_states_[file].currentTailFilePosition_ = std::stoi(value);
+ }
return;
}
-void TailFile::recoverState() {
- std::ifstream file(_stateFile.c_str(), std::ifstream::in);
+bool TailFile::recoverState() {
+ std::ifstream file(state_file_.c_str(), std::ifstream::in);
if (!file.good()) {
- logger_->log_error("load state file failed %s", _stateFile);
- return;
+ logger_->log_error("load state file failed %s", state_file_);
+ return false;
}
+ tail_states_.clear();
char buf[BUFFER_SIZE];
for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
parseStateFileLine(buf);
}
+ return true;
}
void TailFile::storeState() {
- std::ofstream file(_stateFile.c_str());
+ std::ofstream file(state_file_.c_str());
if (!file.is_open()) {
- logger_->log_error("store state file failed %s", _stateFile);
+ logger_->log_error("store state file failed %s", state_file_);
return;
}
- file << "FILENAME=" << this->_currentTailFileName << "\n";
- file << "POSITION=" << this->_currentTailFilePosition << "\n";
+ for (const auto &state : tail_states_) {
+ file << "FILENAME=" << state.first << "\n";
+ file << CURRENT_STR << state.first << "=" << state.second.path_ << utils::file::FileUtils::get_separator() << state.second.current_file_name_ << "\n";
+ file << POSITION_STR << state.first << "=" << state.second.currentTailFilePosition_ << "\n";
+ }
file.close();
}
static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) {
return (i.modifiedTime < j.modifiedTime);
}
-void TailFile::checkRollOver(const std::string &fileLocation, const std::string &baseFileName) {
+void TailFile::checkRollOver(TailState &file, const std::string &base_file_name) {
struct stat statbuf;
std::vector<TailMatchedFileItem> matchedFiles;
- std::string fullPath = fileLocation + "/" + _currentTailFileName;
+ std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() + file.current_file_name_;
if (stat(fullPath.c_str(), &statbuf) == 0) {
logger_->log_trace("Searching for files rolled over");
- std::string pattern = baseFileName;
- std::size_t found = baseFileName.find_last_of(".");
+ std::string pattern = file.current_file_name_;
+ std::size_t found = file.current_file_name_.find_last_of(".");
if (found != std::string::npos)
- pattern = baseFileName.substr(0, found);
+ pattern = file.current_file_name_.substr(0, found);
// Callback, called for each file entry in the listed directory
// Return value is used to break (false) or continue (true) listing
@@ -184,21 +289,20 @@
std::string fileFullName = path + utils::file::FileUtils::get_separator() + filename;
if ((fileFullName.find(pattern) != std::string::npos) && stat(fileFullName.c_str(), &sb) == 0) {
uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
- if (candidateModTime >= _currentTailFileModificationTime) {
- if (filename == _currentTailFileName && candidateModTime == _currentTailFileModificationTime &&
- sb.st_size == _currentTailFilePosition) {
+ if (candidateModTime >= file.currentTailFileModificationTime_) {
+ if (filename == file.current_file_name_ && candidateModTime == file.currentTailFileModificationTime_ &&
+ sb.st_size == file.currentTailFilePosition_) {
return true; // Skip the current file as a candidate in case it wasn't updated
- }
- TailMatchedFileItem item;
- item.fileName = filename;
- item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
- matchedFiles.push_back(item);
- }
}
- return true;
- };
+ TailMatchedFileItem item;
+ item.fileName = filename;
+ item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
+ matchedFiles.push_back(item);
+ }
+ }
+ return true;};
- utils::file::FileUtils::list_dir(fileLocation, lambda, logger_, false);
+ utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false);
if (matchedFiles.size() < 1) {
logger_->log_debug("No newer files found in directory!");
@@ -208,112 +312,113 @@
// Sort the list based on modified time
std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem);
TailMatchedFileItem item = matchedFiles[0];
- logger_->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName, item.fileName);
+ logger_->log_info("TailFile File Roll Over from %s to %s", file.current_file_name_, item.fileName);
// Going ahead in the file rolled over
- if (_currentTailFileName != baseFileName) {
- _currentTailFilePosition = 0;
+ if (file.current_file_name_ != base_file_name) {
+ file.currentTailFilePosition_ = 0;
}
- _currentTailFileName = item.fileName;
+
+ file.current_file_name_ = item.fileName;
+
storeState();
} else {
return;
}
}
-void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
- std::string value;
- std::string fileLocation = "";
- std::string fileName = "";
- if (context->getProperty(FileName.getName(), value)) {
- std::size_t found = value.find_last_of(utils::file::FileUtils::get_separator());
- fileLocation = value.substr(0, found);
- fileName = value.substr(found + 1);
+ std::string st_file;
+ if (context->getProperty(StateFile.getName(), st_file)) {
+ state_file_ = st_file + "." + getUUIDStr();
}
- if (context->getProperty(StateFile.getName(), value)) {
- _stateFile = value + "." + getUUIDStr();
- }
- if (!this->_stateRecovered) {
- _stateRecovered = true;
- this->_currentTailFileName = fileName;
- this->_currentTailFilePosition = 0;
+ if (!this->state_recovered_) {
+ state_recovered_ = true;
// recover the state if we have not done so
this->recoverState();
}
- checkRollOver(fileLocation, fileName);
- std::string fullPath = fileLocation + "/" + _currentTailFileName;
- struct stat statbuf;
- logger_->log_debug("Tailing file %s", fullPath);
- if (stat(fullPath.c_str(), &statbuf) == 0) {
- if ((uint64_t) statbuf.st_size <= this->_currentTailFilePosition) {
- logger_->log_trace("Current pos: %llu", this->_currentTailFilePosition);
- logger_->log_trace("%s", "there are no new input for the current tail file");
- context->yield();
- return;
- }
- std::size_t found = _currentTailFileName.find_last_of(".");
- std::string baseName = _currentTailFileName.substr(0, found);
- std::string extension = _currentTailFileName.substr(found + 1);
+ /**
+ * iterate over file states. may modify them
+ */
+ for (auto &state : tail_states_) {
+ auto fileLocation = state.second.path_;
- if (!delimiter_.empty()) {
- char delim = delimiter_.c_str()[0];
- if (delim == '\\') {
- if (delimiter_.size() > 1) {
- switch (delimiter_.c_str()[1]) {
- case 'r':
- delim = '\r';
- break;
- case 't':
- delim = '\t';
- break;
- case 'n':
- delim = '\n';
- break;
- case '\\':
- delim = '\\';
- break;
- default:
- // previous behavior
- break;
+ checkRollOver(state.second, state.first);
+ std::string fullPath = fileLocation + utils::file::FileUtils::get_separator() + state.second.current_file_name_;
+ struct stat statbuf;
+
+ logger_->log_debug("Tailing file %s", fullPath);
+ if (stat(fullPath.c_str(), &statbuf) == 0) {
+ if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) {
+ logger_->log_trace("Current pos: %llu", state.second.currentTailFilePosition_);
+ logger_->log_trace("%s", "there are no new input for the current tail file");
+ context->yield();
+ return;
+ }
+ std::size_t found = state.first.find_last_of(".");
+ std::string baseName = state.first.substr(0, found);
+ std::string extension = state.first.substr(found + 1);
+
+ if (!delimiter_.empty()) {
+ char delim = delimiter_.c_str()[0];
+ if (delim == '\\') {
+ if (delimiter_.size() > 1) {
+ switch (delimiter_.c_str()[1]) {
+ case 'r':
+ delim = '\r';
+ break;
+ case 't':
+ delim = '\t';
+ break;
+ case 'n':
+ delim = '\n';
+ break;
+ case '\\':
+ delim = '\\';
+ break;
+ default:
+ // previous behavior
+ break;
+ }
}
}
- }
- logger_->log_debug("Looking for delimiter 0x%X", delim);
- std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
- session->import(fullPath, flowFiles, true, this->_currentTailFilePosition, delim);
- logger_->log_info("%u flowfiles were received from TailFile input", flowFiles.size());
+ logger_->log_debug("Looking for delimiter 0x%X", delim);
+ std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
+ session->import(fullPath, flowFiles, true, state.second.currentTailFilePosition_, delim);
+ logger_->log_info("%u flowfiles were received from TailFile input", flowFiles.size());
- for (auto ffr : flowFiles) {
- logger_->log_info("TailFile %s for %u bytes", _currentTailFileName, ffr->getSize());
- std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension;
- ffr->updateKeyedAttribute(PATH, fileLocation);
- ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
- ffr->updateKeyedAttribute(FILENAME, logName);
- session->transfer(ffr, Success);
- this->_currentTailFilePosition += ffr->getSize() + 1;
- storeState();
- }
+ for (auto ffr : flowFiles) {
+ logger_->log_info("TailFile %s for %u bytes", state.first, ffr->getSize());
+ std::string logName = baseName + "." + std::to_string(state.second.currentTailFilePosition_) + "-" + std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + extension;
+ ffr->updateKeyedAttribute(PATH, fileLocation);
+ ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
+ ffr->updateKeyedAttribute(FILENAME, logName);
+ session->transfer(ffr, Success);
+ state.second.currentTailFilePosition_ += ffr->getSize() + 1;
+ storeState();
+ }
+ } else {
+ std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+ if (flowFile) {
+ flowFile->updateKeyedAttribute(PATH, fileLocation);
+ flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
+ session->import(fullPath, flowFile, true, state.second.currentTailFilePosition_);
+ session->transfer(flowFile, Success);
+ logger_->log_info("TailFile %s for %llu bytes", state.first, flowFile->getSize());
+ std::string logName = baseName + "." + std::to_string(state.second.currentTailFilePosition_) + "-" + std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + "."
+ + extension;
+ flowFile->updateKeyedAttribute(FILENAME, logName);
+ state.second.currentTailFilePosition_ += flowFile->getSize();
+ storeState();
+ }
+ }
+ state.second.currentTailFileModificationTime_ = ((uint64_t) (statbuf.st_mtime) * 1000);
} else {
- std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
- if (flowFile) {
- flowFile->updateKeyedAttribute(PATH, fileLocation);
- flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
- session->import(fullPath, flowFile, true, this->_currentTailFilePosition);
- session->transfer(flowFile, Success);
- logger_->log_info("TailFile %s for %llu bytes", _currentTailFileName, flowFile->getSize());
- std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" +
- std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension;
- flowFile->updateKeyedAttribute(FILENAME, logName);
- this->_currentTailFilePosition += flowFile->getSize();
- storeState();
- }
+ logger_->log_warn("Unable to stat file %s", fullPath);
}
- _currentTailFileModificationTime = ((uint64_t) (statbuf.st_mtime) * 1000);
- } else {
- logger_->log_warn("Unable to stat file %s", fullPath);
}
}
diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h
index 54296a2..470689a 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -23,10 +23,10 @@
#include "FlowFileRecord.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
+
#include "core/Core.h"
#include "core/Resource.h"
#include "core/logging/LoggerConfiguration.h"
-
namespace org {
namespace apache {
namespace nifi {
@@ -34,6 +34,23 @@
namespace processors {
// TailFile Class
+
+
+typedef struct {
+ std::string path_;
+ std::string current_file_name_;
+ uint64_t currentTailFilePosition_;
+ uint64_t currentTailFileModificationTime_;
+} TailState;
+
+// Matched File Item for Roll over check
+typedef struct {
+ std::string fileName;
+ uint64_t modifiedTime;
+} TailMatchedFileItem;
+
+
+
class TailFile : public core::Processor {
public:
// Constructor
@@ -42,10 +59,8 @@
*/
explicit TailFile(std::string name, utils::Identifier uuid = utils::Identifier())
: core::Processor(name, uuid),
- _currentTailFilePosition(0),
- _currentTailFileModificationTime(0),
logger_(logging::LoggerFactory<TailFile>::getLogger()) {
- _stateRecovered = false;
+ state_recovered_ = false;
}
// Destructor
virtual ~TailFile() {
@@ -57,40 +72,44 @@
static core::Property FileName;
static core::Property StateFile;
static core::Property Delimiter;
+ static core::Property TailMode;
+ static core::Property BaseDirectory;
// Supported Relationships
static core::Relationship Success;
public:
+
+ bool acceptFile(const std::string &filter, const std::string &file);
/**
* Function that's executed when the processor is scheduled.
* @param context process context.
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
- void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
// OnTrigger method, implemented by NiFi TailFile
- virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
// Initialize, over write by NiFi TailFile
- virtual void initialize(void);
+ void initialize(void) override;
// recoverState
- void recoverState();
+ bool recoverState();
// storeState
void storeState();
- protected:
-
private:
+
+ static const char *CURRENT_STR;
+ static const char *POSITION_STR;
std::mutex tail_file_mutex_;
// File to save state
- std::string _stateFile;
- // State related to the tailed file
- std::string _currentTailFileName;
+ std::string state_file_;
// Delimiter for the data incoming from the tailed file.
std::string delimiter_;
// determine if state is recovered;
- bool _stateRecovered;
- uint64_t _currentTailFilePosition;
- uint64_t _currentTailFileModificationTime;
+ bool state_recovered_;
+
+ std::map<std::string, TailState> tail_states_;
+
static const int BUFFER_SIZE = 512;
// Utils functions for parse state file
@@ -100,7 +119,7 @@
/**
* Check roll over for the provided file.
*/
- void checkRollOver(const std::string &, const std::string&);
+ void checkRollOver(TailState &file, const std::string &base_file_name);
std::shared_ptr<logging::Logger> logger_;
};
@@ -111,12 +130,6 @@
" rather than running with the default value of 0 secs, as this Processor will consume a lot of resources if scheduled very aggressively. At this time, this Processor"
" does not support ingesting files that have been compressed when 'rolled over'.");
-// Matched File Item for Roll over check
-typedef struct {
- std::string fileName;
- uint64_t modifiedTime;
-} TailMatchedFileItem;
-
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index b7c96ec..e8c3269 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -29,6 +29,7 @@
#include "TestBase.h"
#include "core/Core.h"
#include "core/FlowFile.h"
+#include "utils/file/FileUtils.h"
#include "unit/ProvenanceTestHelper.h"
#include "core/Processor.h"
#include "core/ProcessContext.h"
@@ -37,19 +38,18 @@
#include "TailFile.h"
#include "LogAttribute.h"
-
-static std::string NEWLINE_FILE = "" // NOLINT
- "one,two,three\n"
- "four,five,six, seven";
+static std::string NEWLINE_FILE = "" // NOLINT
+ "one,two,three\n"
+ "four,five,six, seven";
static const char *TMP_FILE = "/tmp/minifi-tmpfile.txt";
static const char *STATE_FILE = "/tmp/minifi-state-file.txt";
TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
// Create and write to the test file
- std::ofstream tmpfile;
- tmpfile.open(TMP_FILE);
- tmpfile << NEWLINE_FILE;
- tmpfile.close();
+ std::ofstream tmpfile;
+ tmpfile.open(TMP_FILE);
+ tmpfile << NEWLINE_FILE;
+ tmpfile.close();
TestController testController;
LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
@@ -83,10 +83,6 @@
TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
// Create and write to the test file
- std::ofstream tmpfile;
- tmpfile.open(TMP_FILE);
- tmpfile << NEWLINE_FILE;
- tmpfile.close();
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
@@ -98,11 +94,25 @@
char format[] = "/tmp/gt.XXXXXX";
char *dir = testController.createTempDirectory(format);
+ std::stringstream temp_file_ss;
+ temp_file_ss << dir << utils::file::FileUtils::get_separator() << "minifi-tmpfile.txt";
+ auto temp_file = temp_file_ss.str();
+ std::ofstream tmpfile;
+ tmpfile.open(temp_file);
+ tmpfile << NEWLINE_FILE;
+ tmpfile.close();
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), TMP_FILE);
+ SECTION("Single") {
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file);
+}
+
+ SECTION("Multiple") {
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-.*\\.txt");
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+}
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
-
testController.runSession(plan, false);
auto records = plan->getProvenanceRecords();
std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
@@ -114,17 +124,40 @@
LogTestController::getInstance().reset();
// Delete the test and state file.
- remove(TMP_FILE);
remove(STATE_FILE);
}
+TEST_CASE("TailWithInvalid", "[tailfiletest2]") {
+ TestController testController;
+ LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+
+ plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+
+ char format[] = "/tmp/gt.XXXXXX";
+ char *dir = testController.createTempDirectory(format);
+
+ SECTION("No File and No base") {
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+}
+
+ SECTION("No base") {
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-.*\\.txt");
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+}
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
+
+ REQUIRE_THROWS(plan->runNextProcessor());
+}
+
TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
TestController testController;
const char DELIM = ',';
size_t expected_pieces = std::count(NEWLINE_FILE.begin(), NEWLINE_FILE.end(), DELIM); // The last piece is left as considered unfinished
-
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setTrace<processors::TailFile>();
LogTestController::getInstance().setTrace<processors::LogAttribute>();
@@ -147,29 +180,24 @@
in_file_stream.flush();
// Build MiNiFi processing graph
- auto tail_file = plan->addProcessor(
- "TailFile",
- "Tail");
- plan->setProperty(
- tail_file,
- processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
+ auto tail_file = plan->addProcessor("TailFile", "Tail");
+ plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
+ SECTION("single") {
plan->setProperty(
tail_file,
processors::TailFile::FileName.getName(), in_file);
- plan->setProperty(
- tail_file,
- processors::TailFile::StateFile.getName(), state_file);
- auto log_attr = plan->addProcessor(
- "LogAttribute",
- "Log",
- core::Relationship("success", "description"),
- true);
- plan->setProperty(
- log_attr,
- processors::LogAttribute::FlowFilesToLog.getName(), "0");
+}
+ SECTION("Multiple") {
+ plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "test.*");
+ plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+ plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+}
+ plan->setProperty(tail_file, processors::TailFile::StateFile.getName(), state_file);
+ auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
+ plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
+ plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true");
// Log as many FFs as it can to make sure exactly the expected amount is produced
-
plan->runNextProcessor(); // Tail
plan->runNextProcessor(); // Log
@@ -178,10 +206,9 @@
in_file_stream << DELIM;
in_file_stream.close();
-
std::string rotated_file = (in_file + ".1");
- REQUIRE(rename(in_file.c_str(), rotated_file.c_str() ) == 0);
+ REQUIRE(rename(in_file.c_str(), rotated_file.c_str()) == 0);
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // make sure the new file gets newer modification time
@@ -228,8 +255,7 @@
for (int i = 2; 0 <= i; --i) {
if (i < 2) {
- std::this_thread::sleep_for(
- std::chrono::milliseconds(1000)); // make sure the new file gets newer modification time
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // make sure the new file gets newer modification time
}
std::ofstream in_file_stream(in_file + (i > 0 ? std::to_string(i) : ""));
for (int j = 0; j <= i; j++) {
@@ -239,29 +265,14 @@
}
// Build MiNiFi processing graph
- auto tail_file = plan->addProcessor(
- "TailFile",
- "Tail");
- plan->setProperty(
- tail_file,
- processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
- plan->setProperty(
- tail_file,
- processors::TailFile::FileName.getName(), in_file);
- plan->setProperty(
- tail_file,
- processors::TailFile::StateFile.getName(), state_file);
- auto log_attr = plan->addProcessor(
- "LogAttribute",
- "Log",
- core::Relationship("success", "description"),
- true);
- plan->setProperty(
- log_attr,
- processors::LogAttribute::FlowFilesToLog.getName(), "0");
+ auto tail_file = plan->addProcessor("TailFile", "Tail");
+ plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
+ plan->setProperty(tail_file, processors::TailFile::FileName.getName(), in_file);
+ plan->setProperty(tail_file, processors::TailFile::StateFile.getName(), state_file);
+ auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
+ plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
// Log as many FFs as it can to make sure exactly the expected amount is produced
-
// Each iteration should go through one file and log all flowfiles
for (int i = 2; 0 <= i; --i) {
plan->reset();
@@ -282,142 +293,141 @@
REQUIRE(LogTestController::getInstance().contains(std::string("Logged 2 flow files")));
}
-
/*
-TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
- try {
- // Create and write to the test file
- std::ofstream tmpfile;
- tmpfile.open(TMP_FILE);
- tmpfile << NEWLINE_FILE;
- tmpfile.close();
+ TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
+ try {
+ // Create and write to the test file
+ std::ofstream tmpfile;
+ tmpfile.open(TMP_FILE);
+ tmpfile << NEWLINE_FILE;
+ tmpfile.close();
- TestController testController;
- LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::TailFile>();
- LogTestController::getInstance().setDebug<core::ProcessSession>();
- LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>();
+ TestController testController;
+ LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::TailFile>();
+ LogTestController::getInstance().setDebug<core::ProcessSession>();
+ LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>();
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+ std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
+ std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+ utils::Identifier processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+ utils::Identifier logAttributeuuid;
+ REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- connection->addRelationship(core::Relationship("success", "TailFile successful output"));
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+ connection->addRelationship(core::Relationship("success", "TailFile successful output"));
- // link the connections so that we can test results at the end for this
- connection->setDestination(connection);
+ // link the connections so that we can test results at the end for this
+ connection->setDestination(connection);
- connection->setSourceUUID(processoruuid);
+ connection->setSourceUUID(processoruuid);
- processor->addConnection(connection);
+ processor->addConnection(connection);
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n");
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n");
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
- core::ProcessSession session(&context);
+ core::ProcessSession session(&context);
- REQUIRE(processor->getName() == "tailfile");
+ REQUIRE(processor->getName() == "tailfile");
- core::ProcessSessionFactory factory(&context);
+ core::ProcessSessionFactory factory(&context);
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onSchedule(&context, &factory);
- processor->onTrigger(&context, &session);
+ std::shared_ptr<core::FlowFile> record;
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ processor->onSchedule(&context, &factory);
+ processor->onTrigger(&context, &session);
- provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
- record = session.get();
- REQUIRE(record == nullptr);
- std::shared_ptr<core::FlowFile> ff = session.get();
- REQUIRE(provRecords.size() == 4); // 2 creates and 2 modifies for flowfiles
+ provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+ std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
+ record = session.get();
+ REQUIRE(record == nullptr);
+ std::shared_ptr<core::FlowFile> ff = session.get();
+ REQUIRE(provRecords.size() == 4); // 2 creates and 2 modifies for flowfiles
- LogTestController::getInstance().reset();
- } catch (...) {
- }
+ LogTestController::getInstance().reset();
+ } catch (...) {
+ }
- // Delete the test and state file.
- std::remove(TMP_FILE);
- std::remove(STATE_FILE);
-}
+ // Delete the test and state file.
+ std::remove(TMP_FILE);
+ std::remove(STATE_FILE);
+ }
-TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") {
- try {
- // Create and write to the test file
- std::ofstream tmpfile;
- tmpfile.open(TMP_FILE);
- tmpfile << NEWLINE_FILE;
- tmpfile.close();
+ TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") {
+ try {
+ // Create and write to the test file
+ std::ofstream tmpfile;
+ tmpfile.open(TMP_FILE);
+ tmpfile << NEWLINE_FILE;
+ tmpfile.close();
- TestController testController;
- LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
+ TestController testController;
+ LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>();
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+ std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile");
+ std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
- utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+ utils::Identifier processoruuid;
+ REQUIRE(true == processor->getUUID(processoruuid));
+ utils::Identifier logAttributeuuid;
+ REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
- std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- connection->addRelationship(core::Relationship("success", "TailFile successful output"));
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
+ std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
+ connection->addRelationship(core::Relationship("success", "TailFile successful output"));
- // link the connections so that we can test results at the end for this
- connection->setDestination(connection);
- connection->setSourceUUID(processoruuid);
+ // link the connections so that we can test results at the end for this
+ connection->setDestination(connection);
+ connection->setSourceUUID(processoruuid);
- processor->addConnection(connection);
+ processor->addConnection(connection);
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
- context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+ core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE);
+ context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE);
- core::ProcessSession session(&context);
+ core::ProcessSession session(&context);
- REQUIRE(processor->getName() == "tailfile");
+ REQUIRE(processor->getName() == "tailfile");
- core::ProcessSessionFactory factory(&context);
+ core::ProcessSessionFactory factory(&context);
- std::shared_ptr<core::FlowFile> record;
- processor->setScheduledState(core::ScheduledState::RUNNING);
- processor->onSchedule(&context, &factory);
- processor->onTrigger(&context, &session);
+ std::shared_ptr<core::FlowFile> record;
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ processor->onSchedule(&context, &factory);
+ processor->onTrigger(&context, &session);
- provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
- std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
- record = session.get();
- REQUIRE(record == nullptr);
- std::shared_ptr<core::FlowFile> ff = session.get();
- REQUIRE(provRecords.size() == 2);
+ provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+ std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents();
+ record = session.get();
+ REQUIRE(record == nullptr);
+ std::shared_ptr<core::FlowFile> ff = session.get();
+ REQUIRE(provRecords.size() == 2);
- LogTestController::getInstance().reset();
- } catch (...) {
- }
+ LogTestController::getInstance().reset();
+ } catch (...) {
+ }
- // Delete the test and state file.
- std::remove(TMP_FILE);
- std::remove(STATE_FILE);
-}
-*/
+ // Delete the test and state file.
+ std::remove(TMP_FILE);
+ std::remove(STATE_FILE);
+ }
+ */
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index c3ae370..dd91442 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -97,7 +97,8 @@
if (OPENSSL_FOUND)
set(TLS_SOURCES "src/io/tls/*.cpp")
endif(OPENSSL_FOUND)
-file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
+
+file(GLOB SOURCES "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
file(GLOB PROCESSOR_SOURCES "src/processors/*.cpp" )
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index 2158b47..daf3a20 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -36,7 +36,11 @@
MILLISECOND,
NANOSECOND
};
-
+#if defined(WIN32) || (__cplusplus >= 201103L && (!defined(__GLIBCXX__) || (__cplusplus >= 201402L) || (defined(_GLIBCXX_RELEASE) && _GLIBCXX_RELEASE > 4)))
+#define HAVE_REGEX_CPP 1
+#else
+#define HAVE_REGEX_CPP 0
+#endif
namespace org {
namespace apache {
namespace nifi {
diff --git a/libminifi/include/utils/file/PathUtils.h b/libminifi/include/utils/file/PathUtils.h
new file mode 100644
index 0000000..12925ce
--- /dev/null
+++ b/libminifi/include/utils/file/PathUtils.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_UTILS_PATHUTILS_H_
+#define LIBMINIFI_INCLUDE_UTILS_PATHUTILS_H_
+
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace file {
+namespace PathUtils {
+
+/**
+ * Extracts the filename and path performing some validation of the path and output to ensure
+ * we don't provide invalid results.
+ * @param path input path
+ * @param filePath output file path
+ * @param fileName output file name
+ * @return result of the operation.
+ */
+extern bool getFileNameAndPath(const std::string &path, std::string &filePath, std::string &fileName);
+
+} /* namespace PathUtils */
+} /* namespace file */
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_UTILS_PATHUTILS_H_ */
diff --git a/libminifi/src/utils/file/PathUtils.cpp b/libminifi/src/utils/file/PathUtils.cpp
new file mode 100644
index 0000000..4982369
--- /dev/null
+++ b/libminifi/src/utils/file/PathUtils.cpp
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/file/PathUtils.h"
+#include "utils/file/FileUtils.h"
+#include <iostream>
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace file {
+
+bool PathUtils::getFileNameAndPath(const std::string &path, std::string &filePath, std::string &fileName) {
+ const std::size_t found = path.find_last_of(FileUtils::get_separator());
+ /**
+ * Don't make an assumption about about the path, return false for this case.
+ * Could make the assumption that the path is just the file name but with the function named
+ * getFileNameAndPath we expect both to be there ( a fully qualified path ).
+ *
+ */
+ if (found == std::string::npos || found == path.length() - 1) {
+ return false;
+ }
+ if (found == 0) {
+ filePath = ""; // don't assume that path is not empty
+ filePath += FileUtils::get_separator();
+ fileName = path.substr(found + 1);
+ return true;
+ }
+ filePath = path.substr(0, found);
+ fileName = path.substr(found + 1);
+ return true;
+}
+
+} /* namespace file */
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
diff --git a/libminifi/test/unit/FileUtilsTests.cpp b/libminifi/test/unit/FileUtilsTests.cpp
index 85c783b..0abd3f6 100644
--- a/libminifi/test/unit/FileUtilsTests.cpp
+++ b/libminifi/test/unit/FileUtilsTests.cpp
@@ -23,6 +23,7 @@
#include "../TestBase.h"
#include "core/Core.h"
#include "utils/file/FileUtils.h"
+#include "utils/file/PathUtils.h"
using org::apache::nifi::minifi::utils::file::FileUtils;
@@ -81,6 +82,44 @@
#endif
}
+TEST_CASE("TestFilePath", "[TestGetFileNameAndPath]") {
+ SECTION("VALID FILE AND PATH") {
+ std::stringstream path;
+ path << "a" << FileUtils::get_separator() << "b" << FileUtils::get_separator() << "c";
+ std::stringstream file;
+ file << path.str() << FileUtils::get_separator() << "file";
+ std::string filename, filepath;
+ REQUIRE(true == utils::file::PathUtils::getFileNameAndPath(file.str(), filepath, filename) );
+ REQUIRE(path.str() == filepath);
+ REQUIRE("file" == filename);
+}
+SECTION("NO FILE VALID PATH") {
+ std::stringstream path;
+ path << "a" << FileUtils::get_separator() << "b" << FileUtils::get_separator() << "c" << FileUtils::get_separator();
+ std::string filename, filepath;
+ REQUIRE(false == utils::file::PathUtils::getFileNameAndPath(path.str(), filepath, filename) );
+ REQUIRE(filepath.empty());
+ REQUIRE(filename.empty());
+}
+SECTION("FILE NO PATH") {
+ std::string path = "/file";
+ std::string filename, filepath;
+ std::string expectedPath;
+ expectedPath += FileUtils::get_separator();
+ REQUIRE(true == utils::file::PathUtils::getFileNameAndPath(path, filepath, filename) );
+ REQUIRE(expectedPath == filepath);
+ REQUIRE("file" == filename);
+}
+SECTION("NO FILE NO PATH") {
+ std::string path = "file";
+ std::string filename, filepath;
+ std::string expectedPath = "" + FileUtils::get_separator();
+ REQUIRE(false == utils::file::PathUtils::getFileNameAndPath(path, filepath, filename) );
+ REQUIRE(filepath.empty());
+ REQUIRE(filename.empty());
+}
+}
+
TEST_CASE("TestFileUtils::get_executable_path", "[TestGetExecutablePath]") {
std::string executable_path = FileUtils::get_executable_path();
std::cerr << "Executable path: " << executable_path << std::endl;