MINIFICPP-2146 Add support for SMB networking protocol
Also fixes these:
MINIFICPP-2147 PutSmb
MINIFICPP-2148 FetchSmb
MINIFICPP-2150 ListSmb
Signed-off-by: Ferenc Gerlits <fgerlits@gmail.com>
This closes #1634
diff --git a/CMakeLists.txt b/CMakeLists.txt
index e74e7ed..ca15622 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -106,7 +106,7 @@
# Enable usage of the VERSION specifier
if (WIN32)
add_compile_definitions(WIN32_LEAN_AND_MEAN _CRT_SECURE_NO_WARNINGS NOMINMAX)
- add_compile_options(/W3 /utf-8)
+ add_compile_options(/W3 /utf-8 /bigobj)
endif()
if (NOT PORTABLE)
diff --git a/PROCESSORS.md b/PROCESSORS.md
index a865342..6da9203 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -46,6 +46,7 @@
- [FetchOPCProcessor](#fetchopcprocessor)
- [FetchS3Object](#fetchs3object)
- [FetchSFTP](#fetchsftp)
+- [FetchSmb](#fetchsmb)
- [FocusArchiveEntry](#focusarchiveentry)
- [GenerateFlowFile](#generateflowfile)
- [GetEnvironmentalSensors](#getenvironmentalsensors)
@@ -66,6 +67,7 @@
- [ListGCSBucket](#listgcsbucket)
- [ListS3](#lists3)
- [ListSFTP](#listsftp)
+- [ListSmb](#listsmb)
- [LogAttribute](#logattribute)
- [ManipulateArchive](#manipulatearchive)
- [MergeContent](#mergecontent)
@@ -82,6 +84,7 @@
- [PutOPCProcessor](#putopcprocessor)
- [PutS3Object](#puts3object)
- [PutSFTP](#putsftp)
+- [PutSmb](#putsmb)
- [PutSplunkHTTP](#putsplunkhttp)
- [PutSQL](#putsql)
- [PutTCP](#puttcp)
@@ -1027,6 +1030,36 @@
| permission.denied | Any FlowFile that could not be fetched from the remote server due to insufficient permissions will be transferred to this Relationship. |
+## FetchSmb
+
+### Description
+
+Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+|---------------------------------------|---------------|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **SMB Connection Controller Service** | | | Specifies the SMB connection controller service to use for connecting to the SMB server. |
+| Input Directory | | | The full path of the file to be retrieved from the remote server. If left empty, the path and filename attributes of the incoming flow file will be used.<br/>**Supports Expression Language: true** |
+
+### Relationships
+
+| Name | Description |
+|---------|--------------------------------------------------------------------|
+| success | A flowfile will be routed here for each successfully fetched file. |
+| failure | A flowfile will be routed here when failed to fetch its content. |
+
+### Output Attributes
+
+| Attribute | Relationship | Description |
+|---------------|--------------|-------------------------------------------------------------------|
+| error.code | failure | The error code returned by SMB when the fetch of a file fails. |
+| error.message | failure | The error message returned by SMB when the fetch of a file fails. |
+
+
## FocusArchiveEntry
### Description
@@ -1689,6 +1722,48 @@
| success | All FlowFiles that are received are routed to success |
+## ListSmb
+
+### Description
+
+Retrieves a listing of files from an SMB share. For each file that is listed, creates a FlowFile that represents the file so that it can be fetched in conjunction with FetchSmb.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+|---------------------------------------|---------------|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **SMB Connection Controller Service** | | | Specifies the SMB connection controller service to use for connecting to the SMB server. |
+| Input Directory | | | The input directory to list the contents of |
+| **Recurse Subdirectories** | true | | Indicates whether to list files from subdirectories of the directory |
+| File Filter | | | Only files whose names match the given regular expression will be picked up |
+| Path Filter | | | When Recurse Subdirectories is true, then only subdirectories whose path matches the given regular expression will be scanned |
+| **Minimum File Age** | 5 sec | | The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored |
+| Maximum File Age | | | The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored |
+| Minimum File Size | | | The minimum size that a file must be in order to be pulled |
+| Maximum File Size | | | The maximum size that a file can be in order to be pulled |
+| **Ignore Hidden Files** | true | | Indicates whether or not hidden files should be ignored |
+
+### Relationships
+
+| Name | Description |
+|---------|-------------------------------------------------------|
+| success | All FlowFiles that are received are routed to success |
+
+### Output Attributes
+
+| Attribute | Relationship | Description |
+|------------------|--------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| filename | success | The name of the file that was read from filesystem. |
+| path | success | The path is set to the relative path of the file's directory on the remote filesystem compared to the Share root directory. For example, for a given remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file then the path attribute will be set to "sub/folder". |
+| serviceLocation | success | The SMB URL of the share. |
+| lastModifiedTime | success | The timestamp of when the file's content changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'. |
+| creationTime | success | The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'. |
+| lastAccessTime | success | The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'. |
+| size | success | The size of the file in bytes. |
+
+
## LogAttribute
### Description
@@ -2360,6 +2435,31 @@
| failure | FlowFiles that failed to send to the remote system; failure is usually looped back to this processor |
+## PutSmb
+
+### Description
+
+Writes the contents of a FlowFile to an smb network location
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+|---------------------------------------|---------------|-----------------------------|-----------------------------------------------------------------------------------------------------------------------|
+| **SMB Connection Controller Service** | | | Specifies the SMB connection controller service to use for connecting to the SMB server. |
+| Directory | . | | The output directory to which to put files<br/>**Supports Expression Language: true** |
+| Conflict Resolution Strategy | fail | fail<br/>replace<br/>ignore | Indicates what should happen when a file with the same name already exists in the output directory |
+| **Create Missing Directories** | true | | If true, then missing destination directories will be created. If false, flowfiles are penalized and sent to failure. |
+
+### Relationships
+
+| Name | Description |
+|---------|-------------------------------------------------------------------------|
+| success | All files are routed to success |
+| failure | Failed files (conflict, write failure, etc.) are transferred to failure |
+
+
## PutSplunkHTTP
### Description
diff --git a/README.md b/README.md
index b46bb34..024d555 100644
--- a/README.md
+++ b/README.md
@@ -95,6 +95,7 @@
| ProcFs (Linux) | [ProcFsMonitor](PROCESSORS.md#procfsmonitor) | -DENABLE_PROCFS=ON |
| Python Scripting | [ExecuteScript](PROCESSORS.md#executescript)<br>[ExecutePythonProcessor](PROCESSORS.md#executepythonprocessor)<br/>**Custom Python Processors** | -DENABLE_PYTHON_SCRIPTING=ON |
| Sensors | GetEnvironmentalSensors<br/>GetMovementSensors | -DENABLE_SENSORS=ON |
+| SMB (Windows) | [FetchSmb](PROCESSORS.md#fetchsmb)<br/>[ListSmb](PROCESSORS.md#listsmb)<br/>[PutSmb](PROCESSORS.md#putsmb) | -DENABLE_SMB=ON |
| SFTP | [FetchSFTP](PROCESSORS.md#fetchsftp)<br/>[ListSFTP](PROCESSORS.md#listsftp)<br/>[PutSFTP](PROCESSORS.md#putsftp) | -DENABLE_SFTP=ON |
| SQL | [ExecuteSQL](PROCESSORS.md#executesql)<br/>[PutSQL](PROCESSORS.md#putsql)<br/>[QueryDatabaseTable](PROCESSORS.md#querydatabasetable)<br/> | -DENABLE_SQL=ON |
| Splunk | [PutSplunkHTTP](PROCESSORS.md#putsplunkhttp)<br/>[QuerySplunkIndexingStatus](PROCESSORS.md#querysplunkindexingstatus) | -DENABLE_SPLUNK=ON |
diff --git a/cmake/MiNiFiOptions.cmake b/cmake/MiNiFiOptions.cmake
index c20e7b9..c020820 100644
--- a/cmake/MiNiFiOptions.cmake
+++ b/cmake/MiNiFiOptions.cmake
@@ -85,6 +85,7 @@
add_minifi_option(MSI_REDISTRIBUTE_UCRT_NONASL "Redistribute Universal C Runtime DLLs with the MSI generated by CPack. The resulting MSI is not distributable under Apache 2.0." OFF)
add_minifi_option(ENABLE_WEL "Enables the suite of Windows Event Log extensions." OFF)
add_minifi_option(ENABLE_PDH "Enables PDH support." OFF)
+ add_minifi_option(ENABLE_SMB "Enables SMB support." ON)
endif()
if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
diff --git a/extensions/bustache/tests/ApplyTemplateTests.cpp b/extensions/bustache/tests/ApplyTemplateTests.cpp
index 0a908c2..a0144f9 100644
--- a/extensions/bustache/tests/ApplyTemplateTests.cpp
+++ b/extensions/bustache/tests/ApplyTemplateTests.cpp
@@ -17,11 +17,8 @@
*/
#include <fstream>
-#include <map>
#include <memory>
-#include <utility>
#include <string>
-#include <set>
#include <iostream>
#include "TestBase.h"
@@ -30,9 +27,7 @@
#include "core/FlowFile.h"
#include "core/Processor.h"
-#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
-#include "core/ProcessorNode.h"
#include "repository/VolatileContentRepository.h"
#include "unit/ProvenanceTestHelper.h"
@@ -41,6 +36,8 @@
#include "processors/PutFile.h"
#include "processors/ExtractText.h"
+namespace org::apache::nifi::minifi::processors::test {
+
const char* TEMPLATE = "TemplateBegins\n{{ ExampleAttribute }}\nTemplateEnds";
const char* TEMPLATE_FILE = "test_template.txt";
const char* TEST_ATTR = "ExampleAttribute";
@@ -49,75 +46,76 @@
const char* EXPECT_OUTPUT = "TemplateBegins\nExampleValue\nTemplateEnds";
TEST_CASE("Test Creation of ApplyTemplate", "[ApplyTemplateCreate]") {
- TestController testController;
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ApplyTemplate>("processor_name");
- REQUIRE(processor->getName() == "processor_name");
- REQUIRE(processor->getUUID());
+ TestController testController;
+ std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ApplyTemplate>("processor_name");
+ REQUIRE(processor->getName() == "processor_name");
+ REQUIRE(processor->getUUID());
}
TEST_CASE("Test usage of ApplyTemplate", "[ApplyTemplateTest]") {
- TestController testController;
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::ApplyTemplate>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::PutFile>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::GetFile>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::ExtractText>();
- LogTestController::getInstance().setTrace<core::ProcessSession>();
- LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::FlowFile>();
+ TestController testController;
+ LogTestController::getInstance().setTrace<ApplyTemplate>();
+ LogTestController::getInstance().setTrace<PutFile>();
+ LogTestController::getInstance().setTrace<GetFile>();
+ LogTestController::getInstance().setTrace<ExtractText>();
+ LogTestController::getInstance().setTrace<core::ProcessSession>();
+ LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+ LogTestController::getInstance().setTrace<Connection>();
+ LogTestController::getInstance().setTrace<core::Connectable>();
+ LogTestController::getInstance().setTrace<core::FlowFile>();
- std::shared_ptr<TestPlan> plan = testController.createPlan();
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
- auto get_file_source_dir = testController.createTempDirectory();
- auto template_source_dir = testController.createTempDirectory();
- auto put_file_destination_dir = testController.createTempDirectory();
+ auto get_file_source_dir = testController.createTempDirectory();
+ auto template_source_dir = testController.createTempDirectory();
+ auto put_file_destination_dir = testController.createTempDirectory();
- REQUIRE_FALSE(get_file_source_dir.empty());
- REQUIRE_FALSE(template_source_dir.empty());
- REQUIRE_FALSE(put_file_destination_dir.empty());
+ REQUIRE_FALSE(get_file_source_dir.empty());
+ REQUIRE_FALSE(template_source_dir.empty());
+ REQUIRE_FALSE(put_file_destination_dir.empty());
- std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getFile");
- plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory, get_file_source_dir.string());
- plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::KeepSourceFile, "true");
+ std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getFile");
+ plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory, get_file_source_dir.string());
+ plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::KeepSourceFile, "true");
- std::shared_ptr<core::Processor> extract_text = plan->addProcessor("ExtractText", "testExtractText", core::Relationship("success", "description"), true);
- plan->setProperty(extract_text, org::apache::nifi::minifi::processors::ExtractText::Attribute, TEST_ATTR);
+ std::shared_ptr<core::Processor> extract_text = plan->addProcessor("ExtractText", "testExtractText", core::Relationship("success", "description"), true);
+ plan->setProperty(extract_text, org::apache::nifi::minifi::processors::ExtractText::Attribute, TEST_ATTR);
- std::shared_ptr<core::Processor> apply_template = plan->addProcessor("ApplyTemplate", "testApplyTemplate", core::Relationship("success", "description"), true);
+ std::shared_ptr<core::Processor> apply_template = plan->addProcessor("ApplyTemplate", "testApplyTemplate", core::Relationship("success", "description"), true);
- std::shared_ptr<core::Processor> put_file = plan->addProcessor("PutFile", "put_file", core::Relationship("success", "description"), true);
- plan->setProperty(put_file, org::apache::nifi::minifi::processors::PutFile::Directory, put_file_destination_dir.string());
- plan->setProperty(put_file, org::apache::nifi::minifi::processors::PutFile::ConflictResolution,
- org::apache::nifi::minifi::processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE);
+ std::shared_ptr<core::Processor> put_file = plan->addProcessor("PutFile", "put_file", core::Relationship("success", "description"), true);
+ plan->setProperty(put_file, org::apache::nifi::minifi::processors::PutFile::Directory, put_file_destination_dir.string());
+ plan->setProperty(put_file, org::apache::nifi::minifi::processors::PutFile::ConflictResolution, magic_enum::enum_name(minifi::processors::PutFile::FileExistsResolutionStrategy::replace));
- // Write attribute value to file for GetFile->ExtractText
+ // Write attribute value to file for GetFile->ExtractText
- std::ofstream test_file(get_file_source_dir / TEST_FILE);
- REQUIRE(test_file.is_open());
- test_file << TEST_VALUE;
- test_file.close();
+ std::ofstream test_file(get_file_source_dir / TEST_FILE);
+ REQUIRE(test_file.is_open());
+ test_file << TEST_VALUE;
+ test_file.close();
- // Write template to file
- auto template_path = template_source_dir / TEMPLATE_FILE;
- std::ofstream template_file(template_path);
- REQUIRE(template_file.is_open());
- template_file << TEMPLATE;
- template_file.close();
+ // Write template to file
+ auto template_path = template_source_dir / TEMPLATE_FILE;
+ std::ofstream template_file(template_path);
+ REQUIRE(template_file.is_open());
+ template_file << TEMPLATE;
+ template_file.close();
- plan->setProperty(apply_template, org::apache::nifi::minifi::processors::ApplyTemplate::Template, template_path.string());
+ plan->setProperty(apply_template, org::apache::nifi::minifi::processors::ApplyTemplate::Template, template_path.string());
- // Run processor chain
- plan->runNextProcessor(); // GetFile
- plan->runNextProcessor(); // ExtractText
- plan->runNextProcessor(); // ApplyTemplate
- plan->runNextProcessor(); // PutFile
+ // Run processor chain
+ plan->runNextProcessor(); // GetFile
+ plan->runNextProcessor(); // ExtractText
+ plan->runNextProcessor(); // ApplyTemplate
+ plan->runNextProcessor(); // PutFile
- // Read contents of file
- std::ifstream output_file(put_file_destination_dir / TEST_FILE);
- std::stringstream output_buf;
- output_buf << output_file.rdbuf();
- std::string output_contents = output_buf.str();
- REQUIRE(output_contents == EXPECT_OUTPUT);
+ // Read contents of file
+ std::ifstream output_file(put_file_destination_dir / TEST_FILE);
+ std::stringstream output_buf;
+ output_buf << output_file.rdbuf();
+ std::string output_contents = output_buf.str();
+ REQUIRE(output_contents == EXPECT_OUTPUT);
}
+
+} // namespace org::apache::nifi::minifi::processors::test
diff --git a/extensions/expression-language/tests/ExpressionLanguageTests.cpp b/extensions/expression-language/tests/ExpressionLanguageTests.cpp
index 0fa4793..325e8fd 100644
--- a/extensions/expression-language/tests/ExpressionLanguageTests.cpp
+++ b/extensions/expression-language/tests/ExpressionLanguageTests.cpp
@@ -219,7 +219,7 @@
plan->addProcessor("LogAttribute", "LogAttribute", core::Relationship("success", "description"), true);
auto put_file = plan->addProcessor("PutFile", "PutFile", core::Relationship("success", "description"), true);
plan->setProperty(put_file, minifi::processors::PutFile::Directory, (out_dir / "${extracted_attr_name}").string());
- plan->setProperty(put_file, minifi::processors::PutFile::ConflictResolution, minifi::processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE);
+ plan->setProperty(put_file, minifi::processors::PutFile::ConflictResolution, magic_enum::enum_name(minifi::processors::PutFile::FileExistsResolutionStrategy::replace));
plan->setProperty(put_file, minifi::processors::PutFile::CreateDirs, "true");
// Write test input
diff --git a/extensions/http-curl/tests/unit/AlertTests.cpp b/extensions/http-curl/tests/unit/AlertTests.cpp
index 72fcbec..2077415 100644
--- a/extensions/http-curl/tests/unit/AlertTests.cpp
+++ b/extensions/http-curl/tests/unit/AlertTests.cpp
@@ -45,7 +45,7 @@
std::string id(doc["agentId"].GetString(), doc["agentId"].GetStringLength());
REQUIRE(id == agent_id_);
std::vector<std::string> batch;
- for (size_t i = 0; i < doc["alerts"].Size(); ++i) {
+ for (rapidjson::SizeType i = 0; i < doc["alerts"].Size(); ++i) {
REQUIRE(doc["alerts"][i].IsString());
batch.emplace_back(doc["alerts"][i].GetString(), doc["alerts"][i].GetStringLength());
}
diff --git a/extensions/python/ExecutePythonProcessor.h b/extensions/python/ExecutePythonProcessor.h
index e2adb26..08df517 100644
--- a/extensions/python/ExecutePythonProcessor.h
+++ b/extensions/python/ExecutePythonProcessor.h
@@ -130,7 +130,7 @@
std::string script_to_exec_;
bool reload_on_script_change_;
- std::optional<std::filesystem::file_time_type> last_script_write_time_;
+ std::optional<std::chrono::file_clock::time_point> last_script_write_time_;
std::string script_file_path_;
std::shared_ptr<core::logging::Logger> python_logger_;
std::unique_ptr<PythonScriptEngine> python_script_engine_;
diff --git a/extensions/python/tests/ExecutePythonProcessorTests.cpp b/extensions/python/tests/ExecutePythonProcessorTests.cpp
index 7cea60c..fe4a6f0 100644
--- a/extensions/python/tests/ExecutePythonProcessorTests.cpp
+++ b/extensions/python/tests/ExecutePythonProcessorTests.cpp
@@ -53,7 +53,6 @@
plan_ = testController_->createPlan();
logTestController_.setDebug<TestPlan>();
logTestController_.setDebug<minifi::processors::PutFile>();
- logTestController_.setDebug<minifi::processors::PutFile::ReadCallback>();
}
auto getScriptFullPath(const std::filesystem::path& script_file_name) {
diff --git a/extensions/sftp/tests/FetchSFTPTests.cpp b/extensions/sftp/tests/FetchSFTPTests.cpp
index bc01678..3148e7b 100644
--- a/extensions/sftp/tests/FetchSFTPTests.cpp
+++ b/extensions/sftp/tests/FetchSFTPTests.cpp
@@ -101,7 +101,7 @@
// Configure PutFile processor
plan->setProperty(put_file, "Directory", (dst_dir / "${path}").string());
- plan->setProperty(put_file, "Conflict Resolution Strategy", minifi::processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL);
+ plan->setProperty(put_file, "Conflict Resolution Strategy", magic_enum::enum_name(minifi::processors::PutFile::FileExistsResolutionStrategy::fail));
plan->setProperty(put_file, "Create Missing Directories", "true");
}
diff --git a/extensions/sftp/tests/ListSFTPTests.cpp b/extensions/sftp/tests/ListSFTPTests.cpp
index f0c7ea6..fd554be 100644
--- a/extensions/sftp/tests/ListSFTPTests.cpp
+++ b/extensions/sftp/tests/ListSFTPTests.cpp
@@ -133,7 +133,7 @@
}
// Create source file
- void createFile(const std::filesystem::path& relative_path, const std::string& content, std::optional<std::filesystem::file_time_type> modification_time) {
+ void createFile(const std::filesystem::path& relative_path, const std::string& content, std::optional<std::chrono::file_clock::time_point> modification_time) {
std::fstream file;
std::filesystem::path full_path = src_dir / "vfs" / relative_path;
std::filesystem::create_directories(full_path.parent_path());
diff --git a/extensions/sftp/tests/ListThenFetchSFTPTests.cpp b/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
index 359217b..dbe31d2 100644
--- a/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
+++ b/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
@@ -138,7 +138,7 @@
// Configure PutFile processor
plan->setProperty(put_file, "Directory", (dst_dir / "${path}").string());
- plan->setProperty(put_file, "Conflict Resolution Strategy", minifi::processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL);
+ plan->setProperty(put_file, "Conflict Resolution Strategy", magic_enum::enum_name(minifi::processors::PutFile::FileExistsResolutionStrategy::fail));
plan->setProperty(put_file, "Create Missing Directories", "true");
}
@@ -147,7 +147,7 @@
}
// Create source file
- void createFile(const std::string& relative_path, const std::string& content, std::optional<std::filesystem::file_time_type> modification_time) {
+ void createFile(const std::string& relative_path, const std::string& content, std::optional<std::chrono::file_clock::time_point> modification_time) {
std::fstream file;
std::filesystem::path full_path = src_dir / "vfs" / relative_path;
std::filesystem::create_directories(full_path.parent_path());
diff --git a/extensions/sftp/tests/PutSFTPTests.cpp b/extensions/sftp/tests/PutSFTPTests.cpp
index 96aeed9..b6cc046 100644
--- a/extensions/sftp/tests/PutSFTPTests.cpp
+++ b/extensions/sftp/tests/PutSFTPTests.cpp
@@ -154,7 +154,7 @@
REQUIRE(false == file.good());
}
- void testModificationTime(const std::string& relative_path, std::filesystem::file_time_type mtime) {
+ void testModificationTime(const std::string& relative_path, std::chrono::file_clock::time_point mtime) {
auto result_file = dst_dir / "vfs" / relative_path;
REQUIRE(mtime == utils::file::last_write_time(result_file).value());
}
diff --git a/extensions/smb/CMakeLists.txt b/extensions/smb/CMakeLists.txt
new file mode 100644
index 0000000..4cb049f
--- /dev/null
+++ b/extensions/smb/CMakeLists.txt
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if (NOT (WIN32 AND ENABLE_SMB))
+ return()
+endif()
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
+file(GLOB SOURCES "*.cpp")
+
+add_library(minifi-smb SHARED ${SOURCES})
+target_link_libraries(minifi-smb ${LIBMINIFI} Mpr)
+target_include_directories(minifi-smb PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+
+register_extension(minifi-smb "SMB EXTENSIONS" SMB-EXTENSIONS "This enables SMB support" "extensions/smb/tests")
+
+register_extension_linter(minifi-smb-extensions-linter)
diff --git a/extensions/smb/FetchSmb.cpp b/extensions/smb/FetchSmb.cpp
new file mode 100644
index 0000000..19c7a84
--- /dev/null
+++ b/extensions/smb/FetchSmb.cpp
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchSmb.h"
+#include "core/Resource.h"
+#include "utils/file/FileReaderCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::smb {
+
+void FetchSmb::initialize() {
+ setSupportedProperties(Properties);
+ setSupportedRelationships(Relationships);
+}
+
+void FetchSmb::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+ gsl_Expects(context);
+ smb_connection_controller_service_ = SmbConnectionControllerService::getFromProperty(*context, FetchSmb::ConnectionControllerService);
+}
+
+namespace {
+std::filesystem::path getTargetRelativePath(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+ auto remote_file = context.getProperty(FetchSmb::RemoteFile, flow_file);
+ if (remote_file && !remote_file->empty()) {
+ return std::filesystem::path{*remote_file}.relative_path(); // We need to make sure that the path remains relative (e.g. ${path}/foo where ${path} is empty can lead to /foo)
+ }
+ std::filesystem::path path = flow_file->getAttribute(core::SpecialFlowAttribute::PATH).value_or("");
+ std::filesystem::path filename = flow_file->getAttribute(core::SpecialFlowAttribute::FILENAME).value_or("");
+ return path / filename;
+}
+} // namespace
+
+void FetchSmb::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+ gsl_Expects(context && session && smb_connection_controller_service_);
+
+ if (auto connection_error = smb_connection_controller_service_->validateConnection()) {
+ logger_->log_error("Couldn't establish connection to the specified network location due to %s", connection_error.message());
+ context->yield();
+ return;
+ }
+
+ auto flow_file = session->get();
+ if (!flow_file) {
+ context->yield();
+ return;
+ }
+
+ try {
+ session->write(flow_file, utils::FileReaderCallback{smb_connection_controller_service_->getPath() / getTargetRelativePath(*context, flow_file)});
+ session->transfer(flow_file, Success);
+ } catch (const utils::FileReaderCallbackIOError& io_error) {
+ flow_file->addAttribute(ErrorCode.name, std::to_string(io_error.error_code));
+ flow_file->addAttribute(ErrorMessage.name, io_error.what());
+ session->transfer(flow_file, Failure);
+ }
+}
+
+REGISTER_RESOURCE(FetchSmb, Processor);
+
+} // namespace org::apache::nifi::minifi::extensions::smb
diff --git a/extensions/smb/FetchSmb.h b/extensions/smb/FetchSmb.h
new file mode 100644
index 0000000..c1f0286
--- /dev/null
+++ b/extensions/smb/FetchSmb.h
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <regex>
+#include <string>
+#include <utility>
+
+#include "SmbConnectionControllerService.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Property.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/OutputAttributeDefinition.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Enum.h"
+#include "utils/ListingStateManager.h"
+#include "utils/file/ListedFile.h"
+#include "utils/file/FileUtils.h"
+
+namespace org::apache::nifi::minifi::extensions::smb {
+
+class FetchSmb : public core::Processor {
+ public:
+ explicit FetchSmb(std::string name, const utils::Identifier& uuid = {})
+ : core::Processor(std::move(name), uuid) {
+ }
+
+ EXTENSIONAPI static constexpr const char* Description = "Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.";
+
+ EXTENSIONAPI static constexpr auto ConnectionControllerService = core::PropertyDefinitionBuilder<>::createProperty("SMB Connection Controller Service")
+ .withDescription("Specifies the SMB connection controller service to use for connecting to the SMB server. "
+ "If the SMB share is auto-mounted to a drive letter, its recommended to use FetchFile instead.")
+ .isRequired(true)
+ .withAllowedTypes<SmbConnectionControllerService>()
+ .build();
+ EXTENSIONAPI static constexpr auto RemoteFile = core::PropertyDefinitionBuilder<>::createProperty("Input Directory")
+ .withDescription("The full path of the file to be retrieved from the remote server. If left empty, the path and filename attributes of the incoming flow file will be used.")
+ .isRequired(false)
+ .supportsExpressionLanguage(true)
+ .build();
+ EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 2>{
+ ConnectionControllerService,
+ RemoteFile
+ };
+
+ EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "A flowfile will be routed here for each successfully fetched file."};
+ EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "A flowfile will be routed here when failed to fetch its content."};
+ EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure};
+
+ EXTENSIONAPI static constexpr auto ErrorCode = core::OutputAttributeDefinition<>{"error.code", { Failure }, "The error code returned by SMB when the fetch of a file fails."};
+ EXTENSIONAPI static constexpr auto ErrorMessage = core::OutputAttributeDefinition<>{"error.message", { Failure }, "The error message returned by SMB when the fetch of a file fails."};
+
+ EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 2>{ FetchSmb::ErrorCode, ErrorMessage };
+
+ EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+ EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+ EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
+ EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+ void initialize() override;
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &session_factory) override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+
+ private:
+ std::shared_ptr<SmbConnectionControllerService> smb_connection_controller_service_;
+ std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<FetchSmb>::getLogger(uuid_);
+};
+
+} // namespace org::apache::nifi::minifi::extensions::smb
diff --git a/extensions/smb/ListSmb.cpp b/extensions/smb/ListSmb.cpp
new file mode 100644
index 0000000..99fb8f7
--- /dev/null
+++ b/extensions/smb/ListSmb.cpp
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListSmb.h"
+#include <filesystem>
+
+#include "utils/StringUtils.h"
+#include "utils/TimeUtil.h"
+#include "utils/OsUtils.h"
+#include "utils/file/FileUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::extensions::smb {
+
+void ListSmb::initialize() {
+ setSupportedProperties(Properties);
+ setSupportedRelationships(Relationships);
+}
+
+void ListSmb::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &/*sessionFactory*/) {
+ gsl_Expects(context);
+ smb_connection_controller_service_ = SmbConnectionControllerService::getFromProperty(*context, ListSmb::ConnectionControllerService);
+
+ auto state_manager = context->getStateManager();
+ if (state_manager == nullptr) {
+ throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+ }
+ state_manager_ = std::make_unique<minifi::utils::ListingStateManager>(state_manager);
+
+ input_directory_ = context->getProperty(InputDirectory).value_or("");
+
+ context->getProperty(RecurseSubdirectories, recurse_subdirectories_);
+
+ std::string value;
+ if (context->getProperty(FileFilter, value) && !value.empty()) {
+ file_filter_.filename_filter = std::regex(value);
+ }
+
+ if (recurse_subdirectories_ && context->getProperty(PathFilter, value) && !value.empty()) {
+ file_filter_.path_filter = std::regex(value);
+ }
+
+ if (auto minimum_file_age = context->getProperty<core::TimePeriodValue>(MinimumFileAge)) {
+ file_filter_.minimum_file_age = minimum_file_age->getMilliseconds();
+ }
+
+ if (auto maximum_file_age = context->getProperty<core::TimePeriodValue>(MaximumFileAge)) {
+ file_filter_.maximum_file_age = maximum_file_age->getMilliseconds();
+ }
+
+ if (const auto minimum_file_size = context->getProperty<core::DataSizeValue>(MinimumFileSize)) {
+ file_filter_.minimum_file_size = minimum_file_size->getValue();
+ }
+
+ if (const auto maximum_file_size = context->getProperty<core::DataSizeValue>(MaximumFileSize)) {
+ file_filter_.maximum_file_size = maximum_file_size->getValue();
+ }
+
+ context->getProperty(IgnoreHiddenFiles, file_filter_.ignore_hidden_files);
+}
+
+namespace {
+std::string getDateTimeStr(std::chrono::file_clock::time_point time_point) {
+ return utils::timeutils::getDateTimeStr(std::chrono::time_point_cast<std::chrono::seconds>(utils::file::to_sys(time_point)));
+}
+}
+
+std::shared_ptr<core::FlowFile> ListSmb::createFlowFile(core::ProcessSession& session, const utils::ListedFile& listed_file) {
+ auto flow_file = session.create();
+ session.putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, listed_file.getPath().filename().string());
+ session.putAttribute(flow_file, core::SpecialFlowAttribute::ABSOLUTE_PATH, (listed_file.getPath().parent_path() / "").string());
+
+ auto relative_path = std::filesystem::relative(listed_file.getPath().parent_path(), smb_connection_controller_service_->getPath());
+ session.putAttribute(flow_file, core::SpecialFlowAttribute::PATH, (relative_path / "").string());
+
+ session.putAttribute(flow_file, Size.name, std::to_string(utils::file::file_size(listed_file.getPath())));
+
+ if (auto windows_file_times = utils::file::getWindowsFileTimes(listed_file.getPath())) {
+ session.putAttribute(flow_file, CreationTime.name, getDateTimeStr(windows_file_times->creation_time));
+ session.putAttribute(flow_file, LastModifiedTime.name, getDateTimeStr(windows_file_times->last_write_time));
+ session.putAttribute(flow_file, LastAccessTime.name, getDateTimeStr(windows_file_times->last_access_time));
+ } else {
+ logger_->log_warn("Could not get file attributes due to %s", windows_file_times.error().message());
+ }
+
+ session.putAttribute(flow_file, ServiceLocation.name, smb_connection_controller_service_->getPath().string());
+
+
+ return flow_file;
+}
+
+void ListSmb::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+ gsl_Expects(context && session && smb_connection_controller_service_);
+
+ if (auto connection_error = smb_connection_controller_service_->validateConnection()) {
+ logger_->log_error("Couldn't establish connection to the specified network location due to %s", connection_error.message());
+ context->yield();
+ return;
+ }
+
+ auto stored_listing_state = state_manager_->getCurrentState();
+ auto latest_listing_state = stored_listing_state;
+ uint32_t files_listed = 0;
+
+ auto dir = smb_connection_controller_service_->getPath() / input_directory_;
+ auto process_files = [&](const std::filesystem::path& path, const std::filesystem::path& filename) {
+ auto listed_file = utils::ListedFile(path / filename, dir);
+
+ if (stored_listing_state.wasObjectListedAlready(listed_file) || !listed_file.matches(file_filter_)) {
+ return true;
+ }
+
+ session->transfer(createFlowFile(*session, listed_file), Success);
+ ++files_listed;
+ latest_listing_state.updateState(listed_file);
+ return true;
+ };
+ utils::file::list_dir(dir, process_files, logger_, recurse_subdirectories_);
+
+ state_manager_->storeState(latest_listing_state);
+
+ if (files_listed == 0) {
+ logger_->log_debug("No new files were found in input directory '%s' to list", dir.string());
+ context->yield();
+ }
+}
+
+REGISTER_RESOURCE(ListSmb, Processor);
+
+} // namespace org::apache::nifi::minifi::extensions::smb
diff --git a/extensions/smb/ListSmb.h b/extensions/smb/ListSmb.h
new file mode 100644
index 0000000..a2f4208
--- /dev/null
+++ b/extensions/smb/ListSmb.h
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <regex>
+#include <string>
+#include <utility>
+
+#include "SmbConnectionControllerService.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Property.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/OutputAttributeDefinition.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Enum.h"
+#include "utils/ListingStateManager.h"
+#include "utils/file/ListedFile.h"
+#include "utils/file/FileUtils.h"
+
+namespace org::apache::nifi::minifi::extensions::smb {
+
+class ListSmb : public core::Processor {
+ public:
+ explicit ListSmb(std::string name, const utils::Identifier& uuid = {})
+ : core::Processor(std::move(name), uuid) {
+ }
+
+ EXTENSIONAPI static constexpr const char* Description = "Retrieves a listing of files from an SMB share. For each file that is listed, "
+ "creates a FlowFile that represents the file so that it can be fetched in conjunction with FetchSmb.";
+
+ EXTENSIONAPI static constexpr auto ConnectionControllerService = core::PropertyDefinitionBuilder<>::createProperty("SMB Connection Controller Service")
+ .withDescription("Specifies the SMB connection controller service to use for connecting to the SMB server. "
+ "If the SMB share is auto-mounted to a drive letter, its recommended to use ListFile instead.")
+ .isRequired(true)
+ .withAllowedTypes<SmbConnectionControllerService>()
+ .build();
+ EXTENSIONAPI static constexpr auto InputDirectory = core::PropertyDefinitionBuilder<>::createProperty("Input Directory")
+ .withDescription("The input directory to list the contents of")
+ .isRequired(false)
+ .build();
+ EXTENSIONAPI static constexpr auto RecurseSubdirectories = core::PropertyDefinitionBuilder<>::createProperty("Recurse Subdirectories")
+ .withDescription("Indicates whether to list files from subdirectories of the directory")
+ .withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
+ .withDefaultValue("true")
+ .isRequired(true)
+ .build();
+ EXTENSIONAPI static constexpr auto FileFilter = core::PropertyDefinitionBuilder<>::createProperty("File Filter")
+ .withDescription("Only files whose names match the given regular expression will be picked up")
+ .build();
+ EXTENSIONAPI static constexpr auto PathFilter = core::PropertyDefinitionBuilder<>::createProperty("Path Filter")
+ .withDescription("When Recurse Subdirectories is true, then only subdirectories whose path matches the given regular expression will be scanned")
+ .build();
+ EXTENSIONAPI static constexpr auto MinimumFileAge = core::PropertyDefinitionBuilder<>::createProperty("Minimum File Age")
+ .withDescription("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
+ .isRequired(true)
+ .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+ .withDefaultValue("5 sec")
+ .build();
+ EXTENSIONAPI static constexpr auto MaximumFileAge = core::PropertyDefinitionBuilder<>::createProperty("Maximum File Age")
+ .withDescription("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
+ .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+ .build();
+ EXTENSIONAPI static constexpr auto MinimumFileSize = core::PropertyDefinitionBuilder<>::createProperty("Minimum File Size")
+ .withDescription("The minimum size that a file must be in order to be pulled")
+ .withPropertyType(core::StandardPropertyTypes::DATA_SIZE_TYPE)
+ .build();
+ EXTENSIONAPI static constexpr auto MaximumFileSize = core::PropertyDefinitionBuilder<>::createProperty("Maximum File Size")
+ .withDescription("The maximum size that a file can be in order to be pulled")
+ .withPropertyType(core::StandardPropertyTypes::DATA_SIZE_TYPE)
+ .build();
+ EXTENSIONAPI static constexpr auto IgnoreHiddenFiles = core::PropertyDefinitionBuilder<>::createProperty("Ignore Hidden Files")
+ .withDescription("Indicates whether or not hidden files should be ignored")
+ .withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
+ .withDefaultValue("true")
+ .isRequired(true)
+ .build();
+
+ EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 10>{
+ ConnectionControllerService,
+ InputDirectory,
+ RecurseSubdirectories,
+ FileFilter,
+ PathFilter,
+ MinimumFileAge,
+ MaximumFileAge,
+ MinimumFileSize,
+ MaximumFileSize,
+ IgnoreHiddenFiles
+ };
+
+ EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "All FlowFiles that are received are routed to success"};
+ EXTENSIONAPI static constexpr auto Relationships = std::array{Success};
+
+ EXTENSIONAPI static constexpr auto Filename = core::OutputAttributeDefinition<>{"filename", { Success }, "The name of the file that was read from filesystem."};
+ EXTENSIONAPI static constexpr auto Path = core::OutputAttributeDefinition<>{"path", { Success },
+ "The path is set to the relative path of the file's directory on the remote filesystem compared to the Share root directory. "
+ "For example, for a given remote location smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file "
+ "then the path attribute will be set to \"DIRECTORY/sub/folder\"."};
+ EXTENSIONAPI static constexpr auto ServiceLocation = core::OutputAttributeDefinition<>{"serviceLocation", { Success },
+ "The SMB URL of the share."};
+ EXTENSIONAPI static constexpr auto LastModifiedTime = core::OutputAttributeDefinition<>{"lastModifiedTime", { Success },
+ "The timestamp of when the file's content changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'."};
+ EXTENSIONAPI static constexpr auto CreationTime = core::OutputAttributeDefinition<>{"creationTime", { Success },
+ "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'."};
+ EXTENSIONAPI static constexpr auto LastAccessTime = core::OutputAttributeDefinition<>{"lastAccessTime", { Success },
+ "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'."};
+
+
+ EXTENSIONAPI static constexpr auto Size = core::OutputAttributeDefinition<>{"size", { Success }, "The size of the file in bytes."};
+
+ EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 7> {Filename, Path, ServiceLocation, LastModifiedTime, CreationTime, LastAccessTime, Size };
+
+ EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+ EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+ EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN;
+ EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+ void initialize() override;
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &session_factory) override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+
+ private:
+ std::shared_ptr<core::FlowFile> createFlowFile(core::ProcessSession& session, const utils::ListedFile& listed_file);
+
+ std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ListSmb>::getLogger(uuid_);
+ std::filesystem::path input_directory_;
+ std::shared_ptr<SmbConnectionControllerService> smb_connection_controller_service_;
+ std::unique_ptr<minifi::utils::ListingStateManager> state_manager_;
+ bool recurse_subdirectories_ = true;
+ utils::FileFilter file_filter_{};
+};
+
+} // namespace org::apache::nifi::minifi::extensions::smb
diff --git a/extensions/smb/PutSmb.cpp b/extensions/smb/PutSmb.cpp
new file mode 100644
index 0000000..b2ad649
--- /dev/null
+++ b/extensions/smb/PutSmb.cpp
@@ -0,0 +1,94 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutSmb.h"
+#include "utils/gsl.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/OsUtils.h"
+#include "utils/file/FileWriterCallback.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::extensions::smb {
+
+void PutSmb::initialize() {
+ setSupportedProperties(Properties);
+ setSupportedRelationships(Relationships);
+}
+
+void PutSmb::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory*) {
+ gsl_Expects(context);
+ smb_connection_controller_service_ = SmbConnectionControllerService::getFromProperty(*context, PutSmb::ConnectionControllerService);
+ create_missing_dirs_ = context->getProperty<bool>(PutSmb::CreateMissingDirectories).value_or(true);
+ conflict_resolution_strategy_ = utils::parseEnumProperty<FileExistsResolutionStrategy>(*context, ConflictResolution);
+}
+
+std::filesystem::path PutSmb::getFilePath(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+ auto filename = flow_file->getAttribute(core::SpecialFlowAttribute::FILENAME).value_or(flow_file->getUUIDStr());
+ return smb_connection_controller_service_->getPath() / context.getProperty(Directory, flow_file).value_or("") / filename;
+}
+
+void PutSmb::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
+ gsl_Expects(context && session && smb_connection_controller_service_);
+
+ if (auto connection_error = smb_connection_controller_service_->validateConnection()) {
+ logger_->log_error("Couldn't establish connection to the specified network location due to %s", connection_error.message());
+ context->yield();
+ return;
+ }
+
+ auto flow_file = session->get();
+ if (!flow_file) {
+ context->yield();
+ return;
+ }
+
+ auto full_file_path = getFilePath(*context, flow_file);
+
+ if (utils::file::exists(full_file_path)) {
+ logger_->log_warn("Destination file %s exists; applying Conflict Resolution Strategy: %s", full_file_path.string(), std::string(magic_enum::enum_name(conflict_resolution_strategy_)));
+ if (conflict_resolution_strategy_ == FileExistsResolutionStrategy::fail) {
+ session->transfer(flow_file, Failure);
+ return;
+ } else if (conflict_resolution_strategy_ == FileExistsResolutionStrategy::ignore) {
+ session->transfer(flow_file, Success);
+ return;
+ }
+ }
+
+ if (!utils::file::exists(full_file_path.parent_path()) && create_missing_dirs_) {
+ logger_->log_debug("Destination directory does not exist; will attempt to create: %s", full_file_path.parent_path().string());
+ utils::file::create_dir(full_file_path.parent_path(), true);
+ }
+
+ bool success = false;
+
+ utils::FileWriterCallback file_writer_callback(full_file_path);
+ auto read_result = session->read(flow_file, std::ref(file_writer_callback));
+ if (io::isError(read_result)) {
+ logger_->log_error("Failed to write to %s", full_file_path.string());
+ success = false;
+ } else {
+ success = file_writer_callback.commit();
+ }
+
+ session->transfer(flow_file, success ? Success : Failure);
+}
+
+REGISTER_RESOURCE(PutSmb, Processor);
+
+} // namespace org::apache::nifi::minifi::extensions::smb
diff --git a/extensions/smb/PutSmb.h b/extensions/smb/PutSmb.h
new file mode 100644
index 0000000..72f2567
--- /dev/null
+++ b/extensions/smb/PutSmb.h
@@ -0,0 +1,94 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "utils/Enum.h"
+#include "SmbConnectionControllerService.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::extensions::smb {
+
+class PutSmb : public core::Processor {
+ public:
+ explicit PutSmb(std::string name, const utils::Identifier& uuid = {})
+ : core::Processor(std::move(name), uuid) {
+ }
+
+ enum class FileExistsResolutionStrategy {
+ fail,
+ replace,
+ ignore
+ };
+
+ EXTENSIONAPI static constexpr const char* Description = "Writes the contents of a FlowFile to an smb network location";
+
+ EXTENSIONAPI static constexpr auto ConnectionControllerService = core::PropertyDefinitionBuilder<>::createProperty("SMB Connection Controller Service")
+ .withDescription("Specifies the SMB connection controller service to use for connecting to the SMB server. "
+ "If the SMB share is auto-mounted to a drive letter, its recommended to use PutFile instead.")
+ .isRequired(true)
+ .withAllowedTypes<SmbConnectionControllerService>()
+ .build();
+ EXTENSIONAPI static constexpr auto Directory = core::PropertyDefinitionBuilder<>::createProperty("Directory")
+ .withDescription("The output directory to which to put files")
+ .supportsExpressionLanguage(true)
+ .withDefaultValue(".")
+ .build();
+ EXTENSIONAPI static constexpr auto ConflictResolution = core::PropertyDefinitionBuilder<magic_enum::enum_count<FileExistsResolutionStrategy>()>::createProperty("Conflict Resolution Strategy")
+ .withDescription("Indicates what should happen when a file with the same name already exists in the output directory")
+ .withDefaultValue(magic_enum::enum_name(FileExistsResolutionStrategy::fail))
+ .withAllowedValues(magic_enum::enum_names<FileExistsResolutionStrategy>())
+ .build();
+ EXTENSIONAPI static constexpr auto CreateMissingDirectories = core::PropertyDefinitionBuilder<0, 0, 1>::createProperty("Create Missing Directories")
+ .withDescription("If true, then missing destination directories will be created. If false, flowfiles are penalized and sent to failure.")
+ .withDefaultValue("true")
+ .isRequired(true)
+ .withDependentProperties({Directory.name})
+ .build();
+
+ EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 4>{ ConnectionControllerService, Directory, ConflictResolution, CreateMissingDirectories};
+
+ EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "All files are routed to success"};
+ EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "Failed files (conflict, write failure, etc.) are transferred to failure"};
+ EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure};
+
+ EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+ EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+ EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
+ EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
+ void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
+ void initialize() override;
+
+ private:
+ std::filesystem::path getFilePath(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file);
+ bool create_missing_dirs_ = true;
+ FileExistsResolutionStrategy conflict_resolution_strategy_ = FileExistsResolutionStrategy::fail;
+ std::shared_ptr<SmbConnectionControllerService> smb_connection_controller_service_;
+ std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PutSmb>::getLogger(uuid_);
+};
+
+} // namespace org::apache::nifi::minifi::extensions::smb
diff --git a/extensions/smb/SmbConnectionControllerService.cpp b/extensions/smb/SmbConnectionControllerService.cpp
new file mode 100644
index 0000000..f9a0857
--- /dev/null
+++ b/extensions/smb/SmbConnectionControllerService.cpp
@@ -0,0 +1,121 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SmbConnectionControllerService.h"
+#include "core/Resource.h"
+#include "utils/OsUtils.h"
+#include "utils/expected.h"
+
+namespace org::apache::nifi::minifi::extensions::smb {
+
+void SmbConnectionControllerService::initialize() {
+ setSupportedProperties(Properties);
+}
+
+void SmbConnectionControllerService::onEnable() {
+ std::string hostname;
+ std::string share;
+
+ if (!getProperty(Hostname, hostname))
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing hostname");
+
+ if (!getProperty(Share, share))
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing share");
+
+ server_path_ = "\\\\" + hostname + "\\" + share;
+
+ auto password = getProperty(Password);
+ auto username = getProperty(Username);
+
+ if (password.has_value() != username.has_value())
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Either both a username and a password, or neither of them should be provided.");
+
+ if (username.has_value())
+ credentials_.emplace(Credentials{.username = *username, .password = *password});
+ else
+ credentials_.reset();
+
+ net_resource_ = {
+ .dwType = RESOURCETYPE_DISK,
+ .lpLocalName = nullptr,
+ .lpRemoteName = server_path_.data(),
+ .lpProvider = nullptr,
+ };
+}
+
+void SmbConnectionControllerService::notifyStop() {
+ auto disconnection_result = disconnect();
+ if (!disconnection_result)
+ logger_->log_error("Error while disconnecting from SMB: %s", disconnection_result.error().message());
+}
+
+gsl::not_null<std::shared_ptr<SmbConnectionControllerService>> SmbConnectionControllerService::getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property) {
+ std::shared_ptr<SmbConnectionControllerService> smb_connection_controller_service;
+ if (auto connection_controller_name = context.getProperty(property)) {
+ smb_connection_controller_service = std::dynamic_pointer_cast<SmbConnectionControllerService>(context.getControllerService(*connection_controller_name));
+ }
+ if (!smb_connection_controller_service) {
+ throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing SMB Connection Controller Service");
+ }
+ return gsl::make_not_null(smb_connection_controller_service);
+}
+
+nonstd::expected<void, std::error_code> SmbConnectionControllerService::connect() {
+ auto connection_result = WNetAddConnection2A(&net_resource_,
+ credentials_ ? credentials_->password.c_str() : nullptr,
+ credentials_ ? credentials_->username.c_str() : nullptr,
+ CONNECT_TEMPORARY);
+ if (connection_result == NO_ERROR)
+ return {};
+
+ return nonstd::make_unexpected(utils::OsUtils::windowsErrorToErrorCode(connection_result));
+}
+
+nonstd::expected<void, std::error_code> SmbConnectionControllerService::disconnect() {
+ auto disconnection_result = WNetCancelConnection2A(server_path_.c_str(), 0, true);
+ if (disconnection_result == NO_ERROR)
+ return {};
+
+ return nonstd::make_unexpected(utils::OsUtils::windowsErrorToErrorCode(disconnection_result));
+}
+
+bool SmbConnectionControllerService::isConnected() {
+ std::error_code error_code;
+ auto exists = std::filesystem::exists(server_path_, error_code);
+ if (error_code) {
+ logger_->log_debug("std::filesystem::exists(%s) failed due to %s", server_path_, error_code.message());
+ return false;
+ }
+ return exists;
+}
+
+std::error_code SmbConnectionControllerService::validateConnection() {
+ if (isConnected()) {
+ return std::error_code();
+ }
+ auto connection_result = connect();
+ if (!connection_result) {
+ return connection_result.error();
+ }
+
+ return std::error_code();
+}
+
+REGISTER_RESOURCE(SmbConnectionControllerService, ControllerService);
+
+} // namespace org::apache::nifi::minifi::extensions::smb
diff --git a/extensions/smb/SmbConnectionControllerService.h b/extensions/smb/SmbConnectionControllerService.h
new file mode 100644
index 0000000..405d869
--- /dev/null
+++ b/extensions/smb/SmbConnectionControllerService.h
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <filesystem>
+#include <string>
+#include <memory>
+
+#include "ProcessContext.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/controller/ControllerService.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Enum.h"
+#include "utils/expected.h"
+
+namespace org::apache::nifi::minifi::extensions::smb {
+
+class SmbConnectionControllerService : public core::controller::ControllerService {
+ public:
+ EXTENSIONAPI static constexpr const char* Description = "SMB Connection Controller Service";
+
+ EXTENSIONAPI static constexpr auto Hostname = core::PropertyDefinitionBuilder<>::createProperty("Hostname")
+ .withDescription("The network host to which files should be written.")
+ .isRequired(true)
+ .build();
+ EXTENSIONAPI static constexpr auto Share = core::PropertyDefinitionBuilder<>::createProperty("Share")
+ .withDescription(R"(The network share to which files should be written. This is the "first folder" after the hostname: \\hostname\[share]\dir1\dir2)")
+ .isRequired(true)
+ .build();
+ EXTENSIONAPI static constexpr auto Domain = core::PropertyDefinitionBuilder<>::createProperty("Domain")
+ .withDescription("The domain used for authentication. Optional, in most cases username and password is sufficient.")
+ .isRequired(false)
+ .build();
+ EXTENSIONAPI static constexpr auto Username = core::PropertyDefinitionBuilder<>::createProperty("Username")
+ .withDescription("The username used for authentication. If no username is set then anonymous authentication is attempted.")
+ .isRequired(false)
+ .withDependentProperties({"Password"})
+ .build();
+ EXTENSIONAPI static constexpr auto Password = core::PropertyDefinitionBuilder<>::createProperty("Password")
+ .withDescription("The password used for authentication. Required if Username is set.")
+ .isRequired(false)
+ .withDependentProperties({"Username"})
+ .build();
+
+ static constexpr auto Properties = std::array<core::PropertyReference, 5>{
+ Hostname,
+ Share,
+ Domain,
+ Username,
+ Password
+ };
+
+ EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
+
+ using ControllerService::ControllerService;
+
+ void initialize() override;
+
+ void onEnable() override;
+ void notifyStop() override;
+
+ void yield() override {}
+ bool isRunning() const override { return getState() == core::controller::ControllerServiceState::ENABLED; }
+ bool isWorkAvailable() override { return false; }
+
+ virtual std::error_code validateConnection();
+ virtual std::filesystem::path getPath() const { return server_path_; }
+
+ static gsl::not_null<std::shared_ptr<SmbConnectionControllerService>> getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property);
+
+ private:
+ nonstd::expected<void, std::error_code> connect();
+ nonstd::expected<void, std::error_code> disconnect();
+ bool isConnected();
+
+ struct Credentials {
+ std::string username;
+ std::string password;
+ };
+
+ std::optional<Credentials> credentials_;
+ std::string server_path_;
+ NETRESOURCEA net_resource_;
+ std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<SmbConnectionControllerService>::getLogger(uuid_);
+};
+} // namespace org::apache::nifi::minifi::extensions::smb
diff --git a/extensions/smb/tests/CMakeLists.txt b/extensions/smb/tests/CMakeLists.txt
new file mode 100644
index 0000000..088d878
--- /dev/null
+++ b/extensions/smb/tests/CMakeLists.txt
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB SMB_TESTS "*.cpp")
+
+SET(SMB_TEST_COUNT 0)
+FOREACH(testfile ${SMB_TESTS})
+ get_filename_component(testfilename "${testfile}" NAME_WE)
+ add_executable("${testfilename}" "${testfile}")
+ target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+ target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/smb")
+ target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/libminifi/test/")
+ createTests("${testfilename}")
+ target_link_libraries(${testfilename} Catch2WithMain)
+ target_link_libraries(${testfilename} minifi-smb)
+ target_link_libraries(${testfilename} minifi-standard-processors Netapi32)
+ MATH(EXPR SMB_TEST_COUNT "${SMB_TEST_COUNT}+1")
+ add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${SMB_TEST_COUNT} SMB related test file(s)...")
diff --git a/extensions/smb/tests/FetchSmbTests.cpp b/extensions/smb/tests/FetchSmbTests.cpp
new file mode 100644
index 0000000..9f6c870
--- /dev/null
+++ b/extensions/smb/tests/FetchSmbTests.cpp
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "FetchSmb.h"
+#include "SmbConnectionControllerService.h"
+#include "utils/MockSmbConnectionControllerService.h"
+#include "SingleProcessorTestController.h"
+#include "OsUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::extensions::smb::test {
+
+REGISTER_RESOURCE(MockSmbConnectionControllerService, ControllerService);
+
+TEST_CASE("FetchSmb invalid network path") {
+ const auto fetch_smb = std::make_shared<FetchSmb>("FetchSmb");
+ minifi::test::SingleProcessorTestController controller{fetch_smb};
+ auto smb_connection_node = controller.plan->addController("MockSmbConnectionControllerService", "smb_connection_controller_service");
+ REQUIRE(controller.plan->setProperty(smb_connection_node, SmbConnectionControllerService::Hostname, utils::OsUtils::getHostName().value_or("localhost")));
+ REQUIRE(controller.plan->setProperty(smb_connection_node, SmbConnectionControllerService::Share, "some_share_that_does_not_exist"));
+ REQUIRE(controller.plan->setProperty(fetch_smb, FetchSmb::ConnectionControllerService, "smb_connection_controller_service"));
+ const auto trigger_results = controller.trigger("", {{std::string(core::SpecialFlowAttribute::FILENAME), "a.foo"}, {std::string(core::SpecialFlowAttribute::PATH), ""}});
+ CHECK(trigger_results.at(FetchSmb::Success).empty());
+ CHECK(trigger_results.at(FetchSmb::Failure).empty());
+ CHECK(fetch_smb->isYield());
+}
+
+TEST_CASE("FetchSmb tests") {
+ const auto fetch_smb = std::make_shared<FetchSmb>("FetchSmb");
+ minifi::test::SingleProcessorTestController controller{fetch_smb};
+
+ auto smb_connection_node = controller.plan->addController("MockSmbConnectionControllerService", "smb_connection_controller_service");
+ auto mock_smb_connection_controller_service = std::dynamic_pointer_cast<MockSmbConnectionControllerService>(smb_connection_node->getControllerServiceImplementation());
+ REQUIRE(mock_smb_connection_controller_service);
+
+ constexpr std::string_view a_content = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Vivamus malesuada elit odio, sit amet viverra ante venenatis eget.";
+ constexpr std::string_view b_content = "Phasellus sed pharetra velit. Orci varius natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus.";
+ constexpr std::string_view original_content = "Morbi blandit tincidunt sem ac interdum. Aenean at mauris non augue rhoncus finibus quis vel augue.";
+
+ mock_smb_connection_controller_service->setPath(controller.createTempDirectory());
+
+ auto a_expected_attributes = mock_smb_connection_controller_service->addFile("a.foo", a_content, 5min);
+ auto b_expected_attributes = mock_smb_connection_controller_service->addFile("subdir/b.foo", b_content, 5min);
+
+ REQUIRE(controller.plan->setProperty(fetch_smb, FetchSmb::ConnectionControllerService, "smb_connection_controller_service"));
+
+ SECTION("Without Remote File property") {
+ }
+ SECTION("Remote File Property with expression language (unix separator)") {
+ REQUIRE(controller.plan->setProperty(fetch_smb, FetchSmb::RemoteFile, "${path}/${filename}"));
+ }
+ SECTION("Remote File Property with expression language (windows separator)") {
+ REQUIRE(controller.plan->setProperty(fetch_smb, FetchSmb::RemoteFile, "${path}\\${filename}"));
+ }
+
+ {
+ const auto trigger_results = controller.trigger(original_content, {{std::string(core::SpecialFlowAttribute::FILENAME), "a.foo"}, {std::string(core::SpecialFlowAttribute::PATH), ""}});
+
+ CHECK(trigger_results.at(FetchSmb::Failure).empty());
+ REQUIRE(trigger_results.at(FetchSmb::Success).size() == 1);
+ auto succeeded_flow_file = trigger_results.at(FetchSmb::Success)[0];
+
+ CHECK(controller.plan->getContent(succeeded_flow_file) == a_content);
+ CHECK_FALSE(succeeded_flow_file->getAttribute(FetchSmb::ErrorCode.name));
+ CHECK_FALSE(succeeded_flow_file->getAttribute(FetchSmb::ErrorMessage.name));
+ }
+ {
+ const auto trigger_results = controller.trigger(original_content, {{std::string(core::SpecialFlowAttribute::FILENAME), "b.foo"}, {std::string(core::SpecialFlowAttribute::PATH), "subdir"}});
+
+ CHECK(trigger_results.at(FetchSmb::Failure).empty());
+ REQUIRE(trigger_results.at(FetchSmb::Success).size() == 1);
+ auto succeeded_flow_file = trigger_results.at(FetchSmb::Success)[0];
+
+ CHECK(controller.plan->getContent(succeeded_flow_file) == b_content);
+ CHECK_FALSE(succeeded_flow_file->getAttribute(FetchSmb::ErrorCode.name));
+ CHECK_FALSE(succeeded_flow_file->getAttribute(FetchSmb::ErrorMessage.name));
+ }
+ {
+ const auto trigger_results = controller.trigger(original_content, {{std::string(core::SpecialFlowAttribute::FILENAME), "c.foo"}, {std::string(core::SpecialFlowAttribute::PATH), "subdir"}});
+
+ CHECK(trigger_results.at(FetchSmb::Success).empty());
+ REQUIRE(trigger_results.at(FetchSmb::Failure).size() == 1);
+ auto failed_flow_file = trigger_results.at(FetchSmb::Failure)[0];
+
+ CHECK(controller.plan->getContent(failed_flow_file) == original_content);
+ CHECK(failed_flow_file->getAttribute(FetchSmb::ErrorCode.name) == "2");
+ CHECK(failed_flow_file->getAttribute(FetchSmb::ErrorMessage.name) == "Error opening file: No such file or directory");
+ }
+}
+
+} // namespace org::apache::nifi::minifi::extensions::smb::test
diff --git a/extensions/smb/tests/ListAndFetchSmbTests.cpp b/extensions/smb/tests/ListAndFetchSmbTests.cpp
new file mode 100644
index 0000000..a0481c1
--- /dev/null
+++ b/extensions/smb/tests/ListAndFetchSmbTests.cpp
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "ListSmb.h"
+#include "FetchSmb.h"
+#include "utils/MockSmbConnectionControllerService.h"
+#include "ReadFromFlowFileTestProcessor.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::extensions::smb::test {
+
+REGISTER_RESOURCE(MockSmbConnectionControllerService, ControllerService);
+
+using minifi::processors::ReadFromFlowFileTestProcessor;
+
+TEST_CASE("ListSmb and FetchSmb work together") {
+ TestController controller;
+ auto plan = controller.createPlan();
+ auto list_smb = std::dynamic_pointer_cast<ListSmb>(plan->addProcessor("ListSmb", "list_smb"));
+ auto fetch_smb = std::dynamic_pointer_cast<FetchSmb>(plan->addProcessor("FetchSmb", "fetch_smb"));
+ auto read_from_success_relationship = std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_success_relationship"));
+ auto read_from_failure_relationship = std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_failure_relationship"));
+
+ plan->addConnection(list_smb, ListSmb::Success, fetch_smb);
+
+ plan->addConnection(fetch_smb, FetchSmb::Success, read_from_success_relationship);
+ plan->addConnection(fetch_smb, FetchSmb::Failure, read_from_failure_relationship);
+
+ auto smb_connection_node = plan->addController("MockSmbConnectionControllerService", "smb_connection_controller_service");
+ auto mock_smb_connection_controller_service = std::dynamic_pointer_cast<MockSmbConnectionControllerService>(smb_connection_node->getControllerServiceImplementation());
+ REQUIRE(mock_smb_connection_controller_service);
+
+ plan->setProperty(list_smb, ListSmb::ConnectionControllerService, "smb_connection_controller_service");
+ plan->setProperty(fetch_smb, FetchSmb::ConnectionControllerService, "smb_connection_controller_service");
+
+ read_from_success_relationship->setAutoTerminatedRelationships(std::array<core::Relationship, 1>{ReadFromFlowFileTestProcessor::Success});
+ read_from_failure_relationship->setAutoTerminatedRelationships(std::array<core::Relationship, 1>{ReadFromFlowFileTestProcessor::Success});
+
+ mock_smb_connection_controller_service->setPath(controller.createTempDirectory());
+ constexpr std::string_view content = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Vivamus malesuada elit odio, sit amet viverra ante venenatis eget.";
+ mock_smb_connection_controller_service->addFile("input_dir/sub_dir/b.foo", content, 5min);
+
+ SECTION("With Input Directory") {
+ plan->setProperty(list_smb, ListSmb::InputDirectory, "input_dir");
+ }
+
+ SECTION("Without Input Directory") {
+ }
+
+ controller.runSession(plan);
+ CHECK(read_from_success_relationship->numberOfFlowFilesRead() == 1);
+ CHECK(read_from_failure_relationship->numberOfFlowFilesRead() == 0);
+}
+
+} // namespace org::apache::nifi::minifi::extensions::smb::test
diff --git a/extensions/smb/tests/ListSmbTests.cpp b/extensions/smb/tests/ListSmbTests.cpp
new file mode 100644
index 0000000..d94f40b
--- /dev/null
+++ b/extensions/smb/tests/ListSmbTests.cpp
@@ -0,0 +1,146 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "ListSmb.h"
+#include "Utils.h"
+#include "utils/MockSmbConnectionControllerService.h"
+#include "SingleProcessorTestController.h"
+#include "range/v3/algorithm/count_if.hpp"
+#include "range/v3/algorithm/find_if.hpp"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::extensions::smb::test {
+
+REGISTER_RESOURCE(MockSmbConnectionControllerService, ControllerService);
+
+TEST_CASE("ListSmb invalid network path") {
+ const auto list_smb = std::make_shared<ListSmb>("ListSmb");
+ minifi::test::SingleProcessorTestController controller{list_smb};
+ auto smb_connection_node = controller.plan->addController("MockSmbConnectionControllerService", "smb_connection_controller_service");
+ REQUIRE(controller.plan->setProperty(smb_connection_node, SmbConnectionControllerService::Hostname, utils::OsUtils::getHostName().value_or("localhost")));
+ REQUIRE(controller.plan->setProperty(smb_connection_node, SmbConnectionControllerService::Share, "some_share_that_does_not_exists"));
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::ConnectionControllerService, "smb_connection_controller_service"));
+ const auto trigger_results = controller.trigger();
+ CHECK(trigger_results.at(ListSmb::Success).empty());
+ CHECK(list_smb->isYield());
+}
+
+bool checkForFlowFileWithAttributes(const std::vector<std::shared_ptr<core::FlowFile>>& result, ListSmbExpectedAttributes expected_attributes) {
+ auto matching_flow_file = ranges::find_if(result, [&](const auto& flow_file) { return flow_file->getAttribute(core::SpecialFlowAttribute::FILENAME) == expected_attributes.expected_filename; });
+ if (matching_flow_file == result.end()) {
+ return false;
+ }
+ expected_attributes.checkAttributes(**matching_flow_file);
+ return true;
+}
+
+TEST_CASE("ListSmb tests") {
+ const auto list_smb = std::make_shared<ListSmb>("ListSmb");
+ minifi::test::SingleProcessorTestController controller{list_smb};
+
+ auto smb_connection_node = controller.plan->addController("MockSmbConnectionControllerService", "smb_connection_controller_service");
+ auto mock_smb_connection_controller_service = std::dynamic_pointer_cast<MockSmbConnectionControllerService>(smb_connection_node->getControllerServiceImplementation());
+ REQUIRE(mock_smb_connection_controller_service);
+ mock_smb_connection_controller_service->setPath(controller.createTempDirectory());
+
+ auto a_expected_attributes = mock_smb_connection_controller_service->addFile("a.foo", std::string(10_KiB, 'a'), 5min);
+ auto b_expected_attributes = mock_smb_connection_controller_service->addFile("b.foo", std::string(13_KiB, 'b'), 1h);
+ auto c_expected_attributes = mock_smb_connection_controller_service->addFile("c.bar", std::string(1_KiB, 'c'), 2h);
+ auto d_expected_attributes = mock_smb_connection_controller_service->addFile(std::filesystem::path("subdir") / "some" / "d.foo", std::string(100, 'd'), 10min);
+ auto e_expected_attributes = mock_smb_connection_controller_service->addFile(std::filesystem::path("subdir2") /"e.foo", std::string(1, 'e'), 10s);
+ auto f_expected_attributes = mock_smb_connection_controller_service->addFile(std::filesystem::path("third") / "f.bar", std::string(50_KiB, 'f'), 30min);
+ auto g_expected_attributes = mock_smb_connection_controller_service->addFile("g.foo", std::string(50_KiB, 'f'), 30min);
+ auto hide_file_error = minifi::test::utils::hide_file(mock_smb_connection_controller_service->getPath() / "g.foo");
+ REQUIRE_FALSE(hide_file_error);
+
+ REQUIRE((a_expected_attributes && b_expected_attributes && c_expected_attributes && d_expected_attributes && e_expected_attributes && f_expected_attributes && g_expected_attributes));
+
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::ConnectionControllerService, "smb_connection_controller_service"));
+
+ SECTION("FileFilter without subdirs") {
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::FileFilter, ".*\\.foo"));
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::RecurseSubdirectories, "false"));
+ const auto trigger_results = controller.trigger();
+ CHECK(trigger_results.at(ListSmb::Success).size() == 2);
+ CHECK_FALSE(list_smb->isYield());
+ CHECK(checkForFlowFileWithAttributes(trigger_results.at(ListSmb::Success), *a_expected_attributes));
+ CHECK(checkForFlowFileWithAttributes(trigger_results.at(ListSmb::Success), *b_expected_attributes));
+ }
+
+ SECTION("Input directory") {
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::InputDirectory, "subdir"));
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::RecurseSubdirectories, "true"));
+ const auto trigger_results = controller.trigger();
+ CHECK(trigger_results.at(ListSmb::Success).size() == 1);
+ CHECK_FALSE(list_smb->isYield());
+ CHECK(checkForFlowFileWithAttributes(trigger_results.at(ListSmb::Success), *d_expected_attributes));
+ }
+
+ SECTION("PathFilter and FileFilter") {
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::FileFilter, ".*\\.foo"));
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::RecurseSubdirectories, "true"));
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::PathFilter, "subdir.*"));
+ const auto trigger_results = controller.trigger();
+ CHECK(trigger_results.at(ListSmb::Success).size() == 2);
+ CHECK_FALSE(list_smb->isYield());
+ CHECK(checkForFlowFileWithAttributes(trigger_results.at(ListSmb::Success), *d_expected_attributes));
+ CHECK(checkForFlowFileWithAttributes(trigger_results.at(ListSmb::Success), *e_expected_attributes));
+ }
+
+ SECTION("Subdirs with age restriction") {
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::MinimumFileAge, "3min"));
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::MaximumFileAge, "59min"));
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::RecurseSubdirectories, "true"));
+ const auto trigger_results = controller.trigger();
+ CHECK(trigger_results.at(ListSmb::Success).size() == 3);
+ CHECK_FALSE(list_smb->isYield());
+ CHECK(checkForFlowFileWithAttributes(trigger_results.at(ListSmb::Success), *a_expected_attributes));
+ CHECK(checkForFlowFileWithAttributes(trigger_results.at(ListSmb::Success), *d_expected_attributes));
+ }
+
+ SECTION("Subdirs with size restriction") {
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::MinimumFileSize, "2 KB"));
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::MaximumFileSize, "20 KB"));
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::RecurseSubdirectories, "true"));
+ const auto trigger_results = controller.trigger();
+ CHECK(trigger_results.at(ListSmb::Success).size() == 2);
+ CHECK_FALSE(list_smb->isYield());
+ CHECK(checkForFlowFileWithAttributes(trigger_results.at(ListSmb::Success), *a_expected_attributes));
+ CHECK(checkForFlowFileWithAttributes(trigger_results.at(ListSmb::Success), *b_expected_attributes));
+ }
+
+ SECTION("Dont ignore hidden files") {
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::IgnoreHiddenFiles, "false"));
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::FileFilter, ".*\\.foo"));
+ REQUIRE(controller.plan->setProperty(list_smb, ListSmb::RecurseSubdirectories, "false"));
+ const auto trigger_results = controller.trigger();
+ CHECK(trigger_results.at(ListSmb::Success).size() == 3);
+ CHECK_FALSE(list_smb->isYield());
+ CHECK(checkForFlowFileWithAttributes(trigger_results.at(ListSmb::Success), *a_expected_attributes));
+ CHECK(checkForFlowFileWithAttributes(trigger_results.at(ListSmb::Success), *b_expected_attributes));
+ CHECK(checkForFlowFileWithAttributes(trigger_results.at(ListSmb::Success), *g_expected_attributes));
+ }
+
+ const auto second_trigger = controller.trigger();
+ CHECK(second_trigger.at(ListSmb::Success).empty());
+ CHECK(list_smb->isYield());
+}
+
+} // namespace org::apache::nifi::minifi::extensions::smb::test
diff --git a/extensions/smb/tests/PutSmbTests.cpp b/extensions/smb/tests/PutSmbTests.cpp
new file mode 100644
index 0000000..14f5ed6
--- /dev/null
+++ b/extensions/smb/tests/PutSmbTests.cpp
@@ -0,0 +1,184 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "PutSmb.h"
+#include "utils/MockSmbConnectionControllerService.h"
+#include "SingleProcessorTestController.h"
+#include "range/v3/algorithm/count_if.hpp"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::extensions::smb::test {
+
+REGISTER_RESOURCE(MockSmbConnectionControllerService, ControllerService);
+
+std::string checkFileContent(const std::filesystem::path& path) {
+ gsl_Expects(std::filesystem::exists(path));
+ std::ifstream if_stream(path);
+ return {std::istreambuf_iterator<char>(if_stream), std::istreambuf_iterator<char>()};
+}
+
+TEST_CASE("PutSmb invalid network path") {
+ const auto put_smb = std::make_shared<PutSmb>("PutSmb");
+ minifi::test::SingleProcessorTestController controller{put_smb};
+ auto smb_connection_node = controller.plan->addController("MockSmbConnectionControllerService", "smb_connection_controller_service");
+ REQUIRE(controller.plan->setProperty(smb_connection_node, SmbConnectionControllerService::Hostname, utils::OsUtils::getHostName().value_or("localhost")));
+ REQUIRE(controller.plan->setProperty(smb_connection_node, SmbConnectionControllerService::Share, "some_share_that_does_not_exists"));
+ REQUIRE(controller.plan->setProperty(put_smb, PutSmb::ConnectionControllerService, "smb_connection_controller_service"));
+ const auto trigger_results = controller.trigger("", {{std::string(core::SpecialFlowAttribute::FILENAME), "a.foo"}, {std::string(core::SpecialFlowAttribute::PATH), ""}});
+ CHECK(trigger_results.at(PutSmb::Success).empty());
+ CHECK(trigger_results.at(PutSmb::Failure).empty());
+ CHECK(put_smb->isYield());
+}
+
+TEST_CASE("PutSmb conflict resolution test") {
+ const auto put_smb = std::make_shared<PutSmb>("PutSmb");
+ minifi::test::SingleProcessorTestController controller{put_smb};
+
+ auto temp_directory = controller.createTempDirectory();
+ auto smb_connection_node = controller.plan->addController("MockSmbConnectionControllerService", "smb_connection_controller_service");
+ auto mock_smb_connection_controller_service = std::dynamic_pointer_cast<MockSmbConnectionControllerService>(smb_connection_node->getControllerServiceImplementation());
+ REQUIRE(mock_smb_connection_controller_service);
+ mock_smb_connection_controller_service->setPath(temp_directory);
+
+ controller.plan->setProperty(put_smb, PutSmb::ConnectionControllerService, "smb_connection_controller_service");
+
+ SECTION("Replace") {
+ controller.plan->setProperty(put_smb, PutSmb::ConflictResolution, magic_enum::enum_name(PutSmb::FileExistsResolutionStrategy::replace));
+
+ std::string file_name = "my_file.txt";
+
+ CHECK_FALSE(std::filesystem::exists(temp_directory / file_name));
+
+ const auto first_trigger_results = controller.trigger("alpha", {{std::string(core::SpecialFlowAttribute::FILENAME), file_name}});
+
+ CHECK(first_trigger_results.at(PutSmb::Failure).empty());
+ CHECK(first_trigger_results.at(PutSmb::Success).size() == 1);
+
+ CHECK(std::filesystem::exists(temp_directory / file_name));
+ CHECK(checkFileContent(temp_directory / file_name) == "alpha");
+
+ const auto second_trigger_results = controller.trigger("beta", {{std::string(core::SpecialFlowAttribute::FILENAME), file_name}});
+
+ CHECK(second_trigger_results.at(PutSmb::Failure).empty());
+ CHECK(second_trigger_results.at(PutSmb::Success).size() == 1);
+
+ CHECK(std::filesystem::exists(temp_directory / file_name));
+ CHECK(checkFileContent(temp_directory / file_name) == "beta");
+ }
+
+ SECTION("Ignore") {
+ controller.plan->setProperty(put_smb, PutSmb::ConflictResolution, magic_enum::enum_name(PutSmb::FileExistsResolutionStrategy::ignore));
+
+ std::string file_name = "my_file.txt";
+
+ CHECK_FALSE(std::filesystem::exists(temp_directory / file_name));
+
+ const auto first_trigger_results = controller.trigger("alpha", {{std::string(core::SpecialFlowAttribute::FILENAME), file_name}});
+
+ CHECK(first_trigger_results.at(PutSmb::Failure).empty());
+ CHECK(first_trigger_results.at(PutSmb::Success).size() == 1);
+
+ CHECK(std::filesystem::exists(temp_directory / file_name));
+ CHECK(checkFileContent(temp_directory / file_name) == "alpha");
+
+ const auto second_trigger_results = controller.trigger("beta", {{std::string(core::SpecialFlowAttribute::FILENAME), file_name}});
+
+ CHECK(second_trigger_results.at(PutSmb::Failure).empty());
+ CHECK(second_trigger_results.at(PutSmb::Success).size() == 1);
+
+ CHECK(std::filesystem::exists(temp_directory / file_name));
+ CHECK(checkFileContent(temp_directory / file_name) == "alpha");
+ }
+
+ SECTION("Fail") {
+ controller.plan->setProperty(put_smb, PutSmb::ConflictResolution, magic_enum::enum_name(PutSmb::FileExistsResolutionStrategy::fail));
+
+ std::string file_name = "my_file.txt";
+
+ CHECK_FALSE(std::filesystem::exists(temp_directory / file_name));
+
+ const auto first_trigger_results = controller.trigger("alpha", {{std::string(core::SpecialFlowAttribute::FILENAME), file_name}});
+
+ CHECK(first_trigger_results.at(PutSmb::Failure).empty());
+ CHECK(first_trigger_results.at(PutSmb::Success).size() == 1);
+
+ CHECK(std::filesystem::exists(temp_directory / file_name));
+ CHECK(checkFileContent(temp_directory / file_name) == "alpha");
+
+ const auto second_trigger_results = controller.trigger("beta", {{std::string(core::SpecialFlowAttribute::FILENAME), file_name}});
+
+ CHECK(second_trigger_results.at(PutSmb::Failure).size() == 1);
+ CHECK(second_trigger_results.at(PutSmb::Success).empty());
+
+ CHECK(std::filesystem::exists(temp_directory / file_name));
+ CHECK(checkFileContent(temp_directory / file_name) == "alpha");
+ }
+}
+
+TEST_CASE("PutSmb create missing dirs test") {
+ const auto put_smb = std::make_shared<PutSmb>("PutSmb");
+ minifi::test::SingleProcessorTestController controller{put_smb};
+
+ auto temp_directory = controller.createTempDirectory();
+ auto smb_connection_node = controller.plan->addController("MockSmbConnectionControllerService", "smb_connection_controller_service");
+ auto mock_smb_connection_controller_service = std::dynamic_pointer_cast<MockSmbConnectionControllerService>(smb_connection_node->getControllerServiceImplementation());
+ REQUIRE(mock_smb_connection_controller_service);
+ mock_smb_connection_controller_service->setPath(temp_directory);
+
+ controller.plan->setProperty(put_smb, PutSmb::ConnectionControllerService, "smb_connection_controller_service");
+ controller.plan->setProperty(put_smb, PutSmb::Directory, "a/b");
+
+ SECTION("Create missing dirs") {
+ controller.plan->setProperty(put_smb, PutSmb::CreateMissingDirectories, "true");
+ std::string file_name = "my_file.txt";
+
+ auto expected_path = temp_directory / "a" / "b" / file_name;
+
+ CHECK_FALSE(std::filesystem::exists(expected_path));
+
+ const auto first_trigger_results = controller.trigger("alpha", {{std::string(core::SpecialFlowAttribute::FILENAME), file_name}});
+
+ CHECK(first_trigger_results.at(PutSmb::Failure).empty());
+ CHECK(first_trigger_results.at(PutSmb::Success).size() == 1);
+
+ REQUIRE(std::filesystem::exists(expected_path));
+ CHECK(checkFileContent(expected_path) == "alpha");
+ }
+
+ SECTION("Don't create missing dirs") {
+ controller.plan->setProperty(put_smb, PutSmb::CreateMissingDirectories, "false");
+ std::string file_name = "my_file.txt";
+
+ auto expected_path = temp_directory / "a" / "b" / file_name;
+
+ CHECK_FALSE(std::filesystem::exists(expected_path));
+
+ const auto first_trigger_results = controller.trigger("alpha", {{std::string(core::SpecialFlowAttribute::FILENAME), file_name}});
+
+ CHECK(first_trigger_results.at(PutSmb::Failure).size() == 1);
+ CHECK(first_trigger_results.at(PutSmb::Success).empty());
+
+ CHECK_FALSE(std::filesystem::exists(expected_path));
+ }
+}
+
+
+
+} // namespace org::apache::nifi::minifi::extensions::smb::test
diff --git a/extensions/smb/tests/SmbConnectionControllerServiceTests.cpp b/extensions/smb/tests/SmbConnectionControllerServiceTests.cpp
new file mode 100644
index 0000000..d0aadd4
--- /dev/null
+++ b/extensions/smb/tests/SmbConnectionControllerServiceTests.cpp
@@ -0,0 +1,71 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "Catch.h"
+#include "SmbConnectionControllerService.h"
+#include "utils/TempSmbShare.h"
+
+namespace org::apache::nifi::minifi::extensions::smb::test {
+
+struct SmbConnectionControllerServiceFixture {
+ SmbConnectionControllerServiceFixture() = default;
+
+ TestController test_controller_{};
+ std::shared_ptr<TestPlan> plan_ = test_controller_.createPlan();
+ std::shared_ptr<minifi::core::controller::ControllerServiceNode> smb_connection_node_ = plan_->addController("SmbConnectionControllerService", "smb_connection_controller_service");
+ std::shared_ptr<SmbConnectionControllerService> smb_connection_ = std::dynamic_pointer_cast<SmbConnectionControllerService>(smb_connection_node_->getControllerServiceImplementation());
+};
+
+
+TEST_CASE_METHOD(SmbConnectionControllerServiceFixture, "SmbConnectionControllerService onEnable throws when empty") {
+ REQUIRE_THROWS(plan_->finalize());
+}
+
+TEST_CASE_METHOD(SmbConnectionControllerServiceFixture, "SmbConnectionControllerService anonymous connection") {
+ auto temp_directory = test_controller_.createTempDirectory();
+ auto share_local_name = temp_directory.filename().wstring();
+
+ auto temp_smb_share = TempSmbShare::create(share_local_name, temp_directory.wstring());
+ if (!temp_smb_share && temp_smb_share.error() == std::error_code(5, std::system_category())) {
+ SKIP("SmbConnectionControllerService tests needs administrator privileges");
+ }
+
+
+ SECTION("Valid share") {
+ plan_->setProperty(smb_connection_node_, SmbConnectionControllerService::Hostname, "localhost");
+ plan_->setProperty(smb_connection_node_, SmbConnectionControllerService::Share, minifi::utils::OsUtils::wideStringToString(share_local_name));
+
+ REQUIRE_NOTHROW(plan_->finalize());
+
+ auto connection_error = smb_connection_->validateConnection();
+ CHECK_FALSE(connection_error);
+ }
+
+ SECTION("Invalid share") {
+ plan_->setProperty(smb_connection_node_, SmbConnectionControllerService::Hostname, "localhost");
+ plan_->setProperty(smb_connection_node_, SmbConnectionControllerService::Share, "invalid_share_name");
+
+ REQUIRE_NOTHROW(plan_->finalize());
+
+ auto connection_error = smb_connection_->validateConnection();
+ CHECK(connection_error);
+ }
+}
+
+} // namespace org::apache::nifi::minifi::extensions::smb::test
diff --git a/extensions/smb/tests/utils/MockSmbConnectionControllerService.h b/extensions/smb/tests/utils/MockSmbConnectionControllerService.h
new file mode 100644
index 0000000..6e85b00
--- /dev/null
+++ b/extensions/smb/tests/utils/MockSmbConnectionControllerService.h
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <utility>
+#include <string>
+#include "../../SmbConnectionControllerService.h"
+#include "core/controller/ControllerService.h"
+#include "utils/OsUtils.h"
+#include "utils/file/FileUtils.h"
+#include "ListSmb.h"
+#include "Catch.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::extensions::smb::test {
+
+struct ListSmbExpectedAttributes {
+ std::string expected_filename;
+ std::string expected_path;
+ std::string expected_service_location;
+ std::chrono::system_clock::time_point expected_last_modified_time;
+ std::chrono::system_clock::time_point expected_creation_time;
+ std::chrono::system_clock::time_point expected_last_access_time;
+ std::string expected_size;
+
+ void checkAttributes(core::FlowFile& flow_file) {
+ CHECK(flow_file.getAttribute(ListSmb::Filename.name) == expected_filename);
+ CHECK(flow_file.getAttribute(ListSmb::Path.name) == expected_path);
+ CHECK(flow_file.getAttribute(ListSmb::ServiceLocation.name) == expected_service_location);
+ auto last_modified_time_from_attribute = utils::timeutils::parseDateTimeStr(*flow_file.getAttribute(ListSmb::LastModifiedTime.name));
+ auto creation_time_from_attribute = utils::timeutils::parseDateTimeStr(*flow_file.getAttribute(ListSmb::CreationTime.name));
+ auto last_access_time_from_attribute = utils::timeutils::parseDateTimeStr(*flow_file.getAttribute(ListSmb::LastAccessTime.name));
+
+ CHECK(std::chrono::abs(expected_last_modified_time - *last_modified_time_from_attribute) < 5s);
+ CHECK(std::chrono::abs(expected_creation_time - *creation_time_from_attribute) < 5s);
+ CHECK(std::chrono::abs(expected_last_access_time - *last_access_time_from_attribute) < 5s);
+ CHECK(flow_file.getAttribute(ListSmb::Size.name) == expected_size);
+ }
+};
+
+class MockSmbConnectionControllerService : public SmbConnectionControllerService {
+ public:
+ using SmbConnectionControllerService::SmbConnectionControllerService;
+ EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
+
+ void onEnable() override {}
+ void notifyStop() override {}
+
+ std::error_code validateConnection() override {
+ if (server_path_)
+ return {};
+ return std::make_error_code(std::errc::not_connected);
+ }
+ std::filesystem::path getPath() const override {
+ gsl_Expects(server_path_);
+ return *server_path_;
+ }
+
+ void setPath(std::filesystem::path path) { server_path_ = std::move(path);}
+
+ nonstd::expected<ListSmbExpectedAttributes, std::error_code> addFile(const std::filesystem::path& relative_path,
+ std::string_view content,
+ std::chrono::file_clock::duration age) {
+ auto full_path = getPath() / relative_path;
+ std::filesystem::create_directories(full_path.parent_path());
+ {
+ std::ofstream out_file(full_path, std::ios::binary | std::ios::out);
+ if (!out_file.is_open())
+ return nonstd::make_unexpected(std::make_error_code(std::errc::bad_file_descriptor));
+ out_file << content;
+ }
+ auto current_time = std::chrono::system_clock::now();
+ auto last_write_time_error = utils::file::set_last_write_time(full_path, minifi::utils::file::from_sys(current_time) - age);
+ if (!last_write_time_error)
+ return nonstd::make_unexpected(std::make_error_code(std::errc::bad_file_descriptor));
+ auto path = relative_path.parent_path().empty() ? (std::filesystem::path(".") / "").string() : (relative_path.parent_path() / "").string();
+ return ListSmbExpectedAttributes{
+ .expected_filename = relative_path.filename().string(),
+ .expected_path = path,
+ .expected_service_location = server_path_->string(),
+ .expected_last_modified_time = current_time-age,
+ .expected_creation_time = current_time,
+ .expected_last_access_time = current_time,
+ .expected_size = fmt::format("{}", content.size())};
+ }
+
+ private:
+ std::optional<std::filesystem::path> server_path_ = std::nullopt;
+};
+} // namespace org::apache::nifi::minifi::extensions::smb::test
diff --git a/extensions/smb/tests/utils/TempSmbShare.h b/extensions/smb/tests/utils/TempSmbShare.h
new file mode 100644
index 0000000..5024a0d
--- /dev/null
+++ b/extensions/smb/tests/utils/TempSmbShare.h
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <filesystem>
+#include <utility>
+#include <string>
+#include "windows.h"
+#include "lm.h"
+#include "utils/OsUtils.h"
+#include "utils/expected.h"
+#include "TestUtils.h"
+#include "ListSmb.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::extensions::smb::test {
+
+class TempSmbShare {
+ public:
+ TempSmbShare(TempSmbShare&& other) = default;
+
+ ~TempSmbShare() {
+ if (!net_name_.empty())
+ NetShareDel(nullptr, net_name_.data(), 0);
+ }
+
+ static nonstd::expected<TempSmbShare, std::error_code> create(std::wstring net_name, std::wstring path) {
+ std::wstring remark = L"SMB share to test SMB capabilities of minifi";
+ SHARE_INFO_502 share_info = {
+ .shi502_netname = net_name.data(),
+ .shi502_type = STYPE_DISKTREE,
+ .shi502_remark = remark.data(),
+ .shi502_permissions = ACCESS_ALL,
+ .shi502_max_uses = static_cast<DWORD>(-1),
+ .shi502_current_uses = 0,
+ .shi502_path = path.data(),
+ .shi502_passwd = nullptr,
+ .shi502_reserved = 0,
+ .shi502_security_descriptor = nullptr,
+ };
+
+ DWORD netshare_result = NetShareAdd(nullptr, 502, reinterpret_cast<LPBYTE>(&share_info), nullptr);
+ if (netshare_result == NERR_Success) {
+ return TempSmbShare(std::move(net_name), std::move(path));
+ }
+ return nonstd::make_unexpected(utils::OsUtils::windowsErrorToErrorCode(netshare_result));
+ }
+
+ std::filesystem::path getPath() const {
+ return path_;
+ }
+
+ private:
+ TempSmbShare(std::wstring net_name, std::wstring path) : net_name_(std::move(net_name)), path_(std::move(path)) {}
+
+ std::wstring net_name_;
+ std::wstring path_;
+};
+
+} // namespace org::apache::nifi::minifi::extensions::smb::test
diff --git a/extensions/standard-processors/processors/FetchFile.cpp b/extensions/standard-processors/processors/FetchFile.cpp
index c8d9916..8b0e750 100644
--- a/extensions/standard-processors/processors/FetchFile.cpp
+++ b/extensions/standard-processors/processors/FetchFile.cpp
@@ -21,7 +21,7 @@
#include <utility>
#include "utils/ProcessorConfigUtils.h"
-#include "utils/FileReaderCallback.h"
+#include "utils/file/FileReaderCallback.h"
#include "utils/file/FileUtils.h"
#include "core/Resource.h"
@@ -52,9 +52,9 @@
return file_to_fetch_path;
}
- flow_file->getAttribute("absolute.path", file_to_fetch_path);
+ flow_file->getAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, file_to_fetch_path);
std::string filename;
- flow_file->getAttribute("filename", filename);
+ flow_file->getAttribute(core::SpecialFlowAttribute::FILENAME, filename);
return std::filesystem::path(file_to_fetch_path) / filename;
}
diff --git a/extensions/standard-processors/processors/GetFile.cpp b/extensions/standard-processors/processors/GetFile.cpp
index 8cee464..75b7d7c 100644
--- a/extensions/standard-processors/processors/GetFile.cpp
+++ b/extensions/standard-processors/processors/GetFile.cpp
@@ -28,7 +28,7 @@
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "core/TypedValues.h"
-#include "utils/FileReaderCallback.h"
+#include "utils/file/FileReaderCallback.h"
#include "utils/RegexUtils.h"
using namespace std::literals::chrono_literals;
diff --git a/extensions/standard-processors/processors/ListFile.cpp b/extensions/standard-processors/processors/ListFile.cpp
index 724eade..1309265 100644
--- a/extensions/standard-processors/processors/ListFile.cpp
+++ b/extensions/standard-processors/processors/ListFile.cpp
@@ -47,122 +47,67 @@
context->getProperty(RecurseSubdirectories, recurse_subdirectories_);
std::string value;
- if (context->getProperty(FileFilter, value) && !value.empty()) {
- file_filter_ = std::regex(value);
+ if (context->getProperty(FileFilter.name, value) && !value.empty()) {
+ file_filter_.filename_filter = std::regex(value);
}
- if (recurse_subdirectories_ && context->getProperty(PathFilter, value) && !value.empty()) {
- path_filter_ = std::regex(value);
+ if (recurse_subdirectories_ && context->getProperty(PathFilter.name, value) && !value.empty()) {
+ file_filter_.path_filter = std::regex(value);
}
if (auto minimum_file_age = context->getProperty<core::TimePeriodValue>(MinimumFileAge)) {
- minimum_file_age_ = minimum_file_age->getMilliseconds();
+ file_filter_.minimum_file_age = minimum_file_age->getMilliseconds();
}
- if (auto maximum_file_age = context->getProperty(MaximumFileAge) | utils::andThen(&core::TimePeriodValue::fromString)) {
- maximum_file_age_ = maximum_file_age->getMilliseconds();
+ if (auto maximum_file_age = context->getProperty<core::TimePeriodValue>(MaximumFileAge)) {
+ file_filter_.maximum_file_age = maximum_file_age->getMilliseconds();
}
- uint64_t int_value = 0;
- if (context->getProperty(MinimumFileSize, value) && !value.empty() && core::Property::StringToInt(value, int_value)) {
- minimum_file_size_ = int_value;
+ if (auto minimum_file_size = context->getProperty<core::DataSizeValue>(MinimumFileSize)) {
+ file_filter_.minimum_file_size = minimum_file_size->getValue();
}
- if (context->getProperty(MaximumFileSize, value) && !value.empty() && core::Property::StringToInt(value, int_value)) {
- maximum_file_size_ = int_value;
+ if (auto maximum_file_size = context->getProperty<core::DataSizeValue>(MaximumFileSize)) {
+ file_filter_.maximum_file_size = maximum_file_size->getValue();
}
- context->getProperty(IgnoreHiddenFiles, ignore_hidden_files_);
+ context->getProperty(IgnoreHiddenFiles.name, file_filter_.ignore_hidden_files);
}
-bool ListFile::fileMatchesFilters(const ListedFile& listed_file) {
- if (ignore_hidden_files_ && utils::file::FileUtils::is_hidden(listed_file.full_file_path)) {
- logger_->log_debug("File '%s' is hidden so it will not be listed", listed_file.full_file_path.string());
- return false;
- }
-
- if (file_filter_) {
- const auto file_name = listed_file.full_file_path.filename();
-
- if (!std::regex_match(file_name.string(), *file_filter_)) {
- logger_->log_debug("File '%s' does not match file filter so it will not be listed", listed_file.full_file_path.string());
- return false;
- }
- }
-
- if (path_filter_) {
- const auto relative_path = std::filesystem::relative(listed_file.full_file_path.parent_path(), input_directory_);
- if (!std::regex_match(relative_path.string(), *path_filter_)) {
- logger_->log_debug("Relative path '%s' does not match path filter so file '%s' will not be listed", relative_path.string(), listed_file.full_file_path.string());
- return false;
- }
- }
-
- if (minimum_file_age_ || maximum_file_age_) {
- const auto file_age = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - listed_file.getLastModified());
-
- if (minimum_file_age_ && file_age < *minimum_file_age_) {
- logger_->log_debug("File '%s' does not meet the minimum file age requirement so it will not be listed", listed_file.full_file_path.string());
- return false;
- }
-
- if (maximum_file_age_ && file_age > *maximum_file_age_) {
- logger_->log_debug("File '%s' does not meet the maximum file age requirement so it will not be listed", listed_file.full_file_path.string());
- return false;
- }
- }
-
- if (minimum_file_size_ || maximum_file_size_) {
- const auto file_size = utils::file::file_size(listed_file.full_file_path);
-
- if (minimum_file_size_ && file_size < *minimum_file_size_) {
- logger_->log_debug("File '%s' does not meet the minimum file size requirement so it will not be listed", listed_file.full_file_path.string());
- return false;
- }
-
- if (maximum_file_size_ && *maximum_file_size_ < file_size) {
- logger_->log_debug("File '%s' does not meet the maximum file size requirement so it will not be listed", listed_file.full_file_path.string());
- return false;
- }
- }
-
- return true;
-}
-
-std::shared_ptr<core::FlowFile> ListFile::createFlowFile(core::ProcessSession& session, const ListedFile& listed_file) {
+std::shared_ptr<core::FlowFile> ListFile::createFlowFile(core::ProcessSession& session, const utils::ListedFile& listed_file) {
auto flow_file = session.create();
- session.putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, listed_file.full_file_path.filename().string());
- session.putAttribute(flow_file, core::SpecialFlowAttribute::ABSOLUTE_PATH, (listed_file.full_file_path.parent_path() / "").string());
+ session.putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, listed_file.getPath().filename().string());
+ session.putAttribute(flow_file, core::SpecialFlowAttribute::ABSOLUTE_PATH, (listed_file.getPath().parent_path() / "").string());
- auto relative_path = std::filesystem::relative(listed_file.full_file_path.parent_path(), input_directory_);
+ auto relative_path = std::filesystem::relative(listed_file.getPath().parent_path(), listed_file.getDirectory());
session.putAttribute(flow_file, core::SpecialFlowAttribute::PATH, (relative_path / "").string());
- session.putAttribute(flow_file, "file.size", std::to_string(utils::file::file_size(listed_file.full_file_path)));
- session.putAttribute(flow_file, "file.lastModifiedTime", utils::timeutils::getDateTimeStr(std::chrono::time_point_cast<std::chrono::seconds>(listed_file.last_modified_time)));
+ session.putAttribute(flow_file, ListFile::FileSize.name, std::to_string(utils::file::file_size(listed_file.getPath())));
+ session.putAttribute(flow_file, ListFile::FileLastModifiedTime.name, utils::timeutils::getDateTimeStr(std::chrono::time_point_cast<std::chrono::seconds>(listed_file.getLastModified())));
- if (auto permission_string = utils::file::FileUtils::get_permission_string(listed_file.full_file_path)) {
- session.putAttribute(flow_file, "file.permissions", *permission_string);
+ if (auto permission_string = utils::file::FileUtils::get_permission_string(listed_file.getPath())) {
+ session.putAttribute(flow_file, ListFile::FilePermissions.name, *permission_string);
} else {
- logger_->log_warn("Failed to get permissions of file '%s'", listed_file.full_file_path.string());
- session.putAttribute(flow_file, "file.permissions", "");
+ logger_->log_warn("Failed to get permissions of file '%s'", listed_file.getPath().string());
+ session.putAttribute(flow_file, ListFile::FilePermissions.name, "");
}
- if (auto owner = utils::file::FileUtils::get_file_owner(listed_file.full_file_path)) {
- session.putAttribute(flow_file, "file.owner", *owner);
+ if (auto owner = utils::file::FileUtils::get_file_owner(listed_file.getPath())) {
+ session.putAttribute(flow_file, ListFile::FileOwner.name, *owner);
} else {
- logger_->log_warn("Failed to get owner of file '%s'", listed_file.full_file_path.string());
- session.putAttribute(flow_file, "file.owner", "");
+ logger_->log_warn("Failed to get owner of file '%s'", listed_file.getPath().string());
+ session.putAttribute(flow_file, ListFile::FileOwner.name, "");
}
#ifndef WIN32
- if (auto group = utils::file::FileUtils::get_file_group(listed_file.full_file_path)) {
- session.putAttribute(flow_file, "file.group", *group);
+ if (auto group = utils::file::FileUtils::get_file_group(listed_file.getPath())) {
+ session.putAttribute(flow_file, ListFile::FileGroup.name, *group);
} else {
- logger_->log_warn("Failed to get group of file '%s'", listed_file.full_file_path.string());
- session.putAttribute(flow_file, "file.group", "");
+ logger_->log_warn("Failed to get group of file '%s'", listed_file.getPath().string());
+ session.putAttribute(flow_file, ListFile::FileGroup.name, "");
}
#else
- session.putAttribute(flow_file, "file.group", "");
+ session.putAttribute(flow_file, ListFile::FileGroup.name, "");
#endif
return flow_file;
@@ -170,33 +115,19 @@
void ListFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
gsl_Expects(context && session);
- logger_->log_trace("ListFile onTrigger");
auto stored_listing_state = state_manager_->getCurrentState();
auto latest_listing_state = stored_listing_state;
uint32_t files_listed = 0;
auto process_files = [&](const std::filesystem::path& path, const std::filesystem::path& filename) {
- ListedFile listed_file;
- listed_file.full_file_path = path / filename;
- if (auto last_modified_time = utils::file::last_write_time(listed_file.full_file_path)) {
- listed_file.last_modified_time = std::chrono::time_point_cast<std::chrono::milliseconds>(utils::file::to_sys(*last_modified_time));
- } else {
- logger_->log_warn("Could not get last modification time of file '%s'", listed_file.full_file_path.string());
- listed_file.last_modified_time = {};
- }
+ auto listed_file = utils::ListedFile(path / filename, input_directory_);
- if (stored_listing_state.wasObjectListedAlready(listed_file)) {
- logger_->log_debug("File '%s' was already listed.", listed_file.full_file_path.string());
+ if (stored_listing_state.wasObjectListedAlready(listed_file) || !listed_file.matches(file_filter_)) {
return true;
}
- if (!fileMatchesFilters(listed_file)) {
- return true;
- }
-
- auto flow_file = createFlowFile(*session, listed_file);
- session->transfer(flow_file, Success);
+ session->transfer(createFlowFile(*session, listed_file), Success);
++files_listed;
latest_listing_state.updateState(listed_file);
return true;
diff --git a/extensions/standard-processors/processors/ListFile.h b/extensions/standard-processors/processors/ListFile.h
index 49de33a..40dabf9 100644
--- a/extensions/standard-processors/processors/ListFile.h
+++ b/extensions/standard-processors/processors/ListFile.h
@@ -31,6 +31,7 @@
#include "core/logging/LoggerConfiguration.h"
#include "utils/Enum.h"
#include "utils/ListingStateManager.h"
+#include "utils/file/ListedFile.h"
#include "utils/file/FileUtils.h"
namespace org::apache::nifi::minifi::processors {
@@ -144,33 +145,13 @@
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
private:
- struct ListedFile : public utils::ListedObject {
- [[nodiscard]] std::chrono::time_point<std::chrono::system_clock> getLastModified() const override {
- return last_modified_time;
- }
-
- [[nodiscard]] std::string getKey() const override {
- return full_file_path.string();
- }
-
- std::chrono::time_point<std::chrono::system_clock> last_modified_time;
- std::filesystem::path full_file_path;
- };
-
- bool fileMatchesFilters(const ListedFile& listed_file);
- std::shared_ptr<core::FlowFile> createFlowFile(core::ProcessSession& session, const ListedFile& listed_file);
+ std::shared_ptr<core::FlowFile> createFlowFile(core::ProcessSession& session, const utils::ListedFile& listed_file);
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ListFile>::getLogger(uuid_);
std::filesystem::path input_directory_;
std::unique_ptr<minifi::utils::ListingStateManager> state_manager_;
bool recurse_subdirectories_ = true;
- std::optional<std::regex> file_filter_;
- std::optional<std::regex> path_filter_;
- std::optional<std::chrono::milliseconds> minimum_file_age_;
- std::optional<std::chrono::milliseconds> maximum_file_age_;
- std::optional<uint64_t> minimum_file_size_;
- std::optional<uint64_t> maximum_file_size_;
- bool ignore_hidden_files_ = true;
+ utils::FileFilter file_filter_{};
};
} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp
index c109a66..9db95e6 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -26,10 +26,9 @@
#include <memory>
#include <string>
#include <utility>
-#ifdef WIN32
-#include <Windows.h>
-#endif
#include "utils/file/FileUtils.h"
+#include "utils/file/FileWriterCallback.h"
+#include "utils/ProcessorConfigUtils.h"
#include "utils/gsl.h"
#include "core/Resource.h"
@@ -43,16 +42,10 @@
}
void PutFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory* /*sessionFactory*/) {
- if (!context->getProperty(ConflictResolution, conflict_resolution_)) {
- logger_->log_error("Conflict Resolution Strategy attribute is missing or invalid");
- }
-
- std::string value;
- context->getProperty(CreateDirs, value);
- try_mkdirs_ = utils::StringUtils::toBool(value).value_or(true);
-
- if (context->getProperty(MaxDestFiles, value)) {
- core::Property::StringToInt(value, max_dest_files_);
+ conflict_resolution_strategy_ = utils::parseEnumProperty<FileExistsResolutionStrategy>(*context, ConflictResolution);
+ try_mkdirs_ = context->getProperty<bool>(CreateDirs).value_or(true);
+ if (auto max_dest_files = context->getProperty<int64_t>(MaxDestFiles); max_dest_files && *max_dest_files > 0) {
+ max_dest_files_ = gsl::narrow_cast<uint64_t>(*max_dest_files);
}
#ifndef WIN32
@@ -61,132 +54,91 @@
#endif
}
-void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
- if (IsNullOrEmpty(conflict_resolution_)) {
- logger_->log_error("Conflict resolution value is invalid");
- context->yield();
- return;
- }
-
- std::shared_ptr<core::FlowFile> flowFile = session->get();
-
- // Do nothing if there are no incoming files
- if (!flowFile) {
- return;
- }
-
- session->remove(flowFile);
-
+std::optional<std::filesystem::path> PutFile::getDestinationPath(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
std::filesystem::path directory;
-
- if (auto directory_str = context->getProperty(Directory, flowFile)) {
+ if (auto directory_str = context.getProperty(Directory, flow_file); directory_str && !directory_str->empty()) {
directory = *directory_str;
} else {
- logger_->log_error("Directory attribute is missing or invalid");
- }
-
- if (IsNullOrEmpty(directory)) {
logger_->log_error("Directory attribute evaluated to invalid value");
- session->transfer(flowFile, Failure);
+ return std::nullopt;
+ }
+ auto file_name_str = flow_file->getAttribute(core::SpecialFlowAttribute::FILENAME).value_or(flow_file->getUUIDStr());
+
+ return directory / file_name_str;
+}
+
+bool PutFile::directoryIsFull(const std::filesystem::path& directory) const {
+ return max_dest_files_ && utils::file::countNumberOfFiles(directory) >= *max_dest_files_;
+}
+
+void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
+
+ // Do nothing if there are no incoming files
+ if (!flow_file) {
return;
}
- std::string filename;
- flowFile->getAttribute(core::SpecialFlowAttribute::FILENAME, filename);
- auto tmpFile = tmpWritePath(filename, directory);
+ auto dest_path = getDestinationPath(*context, flow_file);
+ if (!dest_path) {
+ return session->transfer(flow_file, Failure);
+ }
- logger_->log_debug("PutFile using temporary file %s", tmpFile.string());
+ logger_->log_debug("PutFile writing file %s into directory %s", dest_path->filename().string(), dest_path->parent_path().string());
- // Determine dest full file paths
- auto destFile = directory / filename;
+ if (directoryIsFull(dest_path->parent_path())) {
+ logger_->log_warn("Routing to failure because the output directory %s has at least %u files, which exceeds the "
+ "configured max number of files", dest_path->parent_path().string(), *max_dest_files_);
+ return session->transfer(flow_file, Failure);
+ }
- logger_->log_debug("PutFile writing file %s into directory %s", filename, directory.string());
-
- if ((max_dest_files_ != -1) && utils::file::is_directory(directory)) {
- int64_t count = 0;
-
- // Callback, called for each file entry in the listed directory
- // Return value is used to break (false) or continue (true) listing
- auto lambda = [&count, this](const std::filesystem::path&, const std::filesystem::path&) -> bool {
- return ++count < max_dest_files_;
- };
-
- utils::file::list_dir(directory, lambda, logger_, false);
-
- if (count >= max_dest_files_) {
- logger_->log_warn("Routing to failure because the output directory %s has at least %u files, which exceeds the "
- "configured max number of files", directory.string(), max_dest_files_);
- session->transfer(flowFile, Failure);
- return;
+ if (utils::file::exists(*dest_path)) {
+ logger_->log_warn("Destination file %s exists; applying Conflict Resolution Strategy: %s", dest_path->string(), std::string(magic_enum::enum_name(conflict_resolution_strategy_)));
+ if (conflict_resolution_strategy_ == FileExistsResolutionStrategy::fail) {
+ return session->transfer(flow_file, Failure);
+ } else if (conflict_resolution_strategy_ == FileExistsResolutionStrategy::ignore) {
+ return session->transfer(flow_file, Success);
}
}
- if (utils::file::exists(destFile)) {
- logger_->log_warn("Destination file %s exists; applying Conflict Resolution Strategy: %s", destFile.string(), conflict_resolution_);
-
- if (conflict_resolution_ == CONFLICT_RESOLUTION_STRATEGY_REPLACE) {
- putFile(session, flowFile, tmpFile, destFile, directory);
- } else if (conflict_resolution_ == CONFLICT_RESOLUTION_STRATEGY_IGNORE) {
- session->transfer(flowFile, Success);
- } else {
- session->transfer(flowFile, Failure);
- }
- } else {
- putFile(session, flowFile, tmpFile, destFile, directory);
- }
+ putFile(*session, flow_file, *dest_path);
}
-std::filesystem::path PutFile::tmpWritePath(const std::filesystem::path& filename, const std::filesystem::path& directory) {
- utils::Identifier tmpFileUuid = id_generator_->generate();
- auto new_filename = std::filesystem::path("." + filename.filename().string());
- new_filename += "." + tmpFileUuid.to_string();
- return (directory / filename.parent_path() / new_filename);
-}
-
-bool PutFile::putFile(core::ProcessSession *session,
- const std::shared_ptr<core::FlowFile>& flowFile,
- const std::filesystem::path& tmpFile,
- const std::filesystem::path& destFile,
- const std::filesystem::path& destDir) {
- if (!utils::file::exists(destDir) && try_mkdirs_) {
- logger_->log_debug("Destination directory does not exist; will attempt to create: %s", destDir.string());
- utils::file::create_dir(destDir, true);
+void PutFile::prepareDirectory(const std::filesystem::path& directory_path) const {
+ if (!utils::file::exists(directory_path) && try_mkdirs_) {
+ logger_->log_debug("Destination directory does not exist; will attempt to create: %s", directory_path.string());
+ utils::file::create_dir(directory_path, true);
#ifndef WIN32
if (directory_permissions_.valid()) {
- utils::file::set_permissions(destDir, directory_permissions_.getValue());
+ utils::file::set_permissions(directory_path, directory_permissions_.getValue());
}
#endif
}
+}
+
+void PutFile::putFile(core::ProcessSession& session,
+ const std::shared_ptr<core::FlowFile>& flow_file,
+ const std::filesystem::path& dest_file) {
+ prepareDirectory(dest_file.parent_path());
bool success = false;
- if (flowFile->getSize() > 0) {
- ReadCallback cb(tmpFile, destFile);
- session->read(flowFile, std::ref(cb));
- logger_->log_debug("Committing %s", destFile.string());
- success = cb.commit();
+ utils::FileWriterCallback file_writer_callback(dest_file);
+ auto read_result = session.read(flow_file, std::ref(file_writer_callback));
+ if (io::isError(read_result)) {
+ logger_->log_error("Failed to write to %s", dest_file.string());
+ success = false;
} else {
- std::ofstream outfile(destFile, std::ios::out | std::ios::binary);
- if (!outfile.good()) {
- logger_->log_error("Failed to create empty file: %s", destFile.string());
- } else {
- success = true;
- }
+ success = file_writer_callback.commit();
}
#ifndef WIN32
if (permissions_.valid()) {
- utils::file::set_permissions(destFile, permissions_.getValue());
+ utils::file::set_permissions(dest_file, permissions_.getValue());
}
#endif
- if (success) {
- session->transfer(flowFile, Success);
- return true;
- } else {
- session->transfer(flowFile, Failure);
- }
- return false;
+ session.transfer(flow_file, success ? Success : Failure);
}
#ifndef WIN32
@@ -227,66 +179,6 @@
}
#endif
-PutFile::ReadCallback::ReadCallback(std::filesystem::path tmp_file, std::filesystem::path dest_file)
- : tmp_file_(std::move(tmp_file)),
- dest_file_(std::move(dest_file)) {
-}
-
-// Copy the entire file contents to the temporary file
-int64_t PutFile::ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) {
- // Copy file contents into tmp file
- write_succeeded_ = false;
- size_t size = 0;
- std::array<std::byte, 1024> buffer{};
-
- std::ofstream tmp_file_os(tmp_file_, std::ios::out | std::ios::binary);
-
- do {
- const auto read = stream->read(buffer);
- if (io::isError(read)) return -1;
- if (read == 0) break;
- tmp_file_os.write(reinterpret_cast<char *>(buffer.data()), gsl::narrow<std::streamsize>(read));
- size += read;
- } while (size < stream->size());
-
- tmp_file_os.close();
-
- if (tmp_file_os) {
- write_succeeded_ = true;
- }
-
- return gsl::narrow<int64_t>(size);
-}
-
-// Renames tmp file to final destination
-// Returns true if commit succeeded
-bool PutFile::ReadCallback::commit() {
- bool success = false;
-
- logger_->log_info("PutFile committing put file operation to %s", dest_file_.string());
-
- if (write_succeeded_) {
- std::error_code rename_error;
- std::filesystem::rename(tmp_file_, dest_file_, rename_error);
- if (rename_error) {
- logger_->log_info("PutFile commit put file operation to %s failed because std::filesystem::rename call failed", dest_file_.string());
- } else {
- success = true;
- logger_->log_info("PutFile commit put file operation to %s succeeded", dest_file_.string());
- }
- } else {
- logger_->log_error("PutFile commit put file operation to %s failed because write failed", dest_file_.string());
- }
-
- return success;
-}
-
-// Clean up resources
-PutFile::ReadCallback::~ReadCallback() {
- // Clean up tmp file, if necessary
- std::filesystem::remove(tmp_file_);
-}
-
REGISTER_RESOURCE(PutFile, Processor);
} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/PutFile.h b/extensions/standard-processors/processors/PutFile.h
index b9eff26..bb88f82 100644
--- a/extensions/standard-processors/processors/PutFile.h
+++ b/extensions/standard-processors/processors/PutFile.h
@@ -1,7 +1,4 @@
/**
- * @file PutFile.h
- * PutFile class declaration
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -34,21 +31,24 @@
#include "core/logging/LoggerConfiguration.h"
#include "utils/Id.h"
#include "utils/Export.h"
+#include "utils/Enum.h"
namespace org::apache::nifi::minifi::processors {
class PutFile : public core::Processor {
public:
- static constexpr std::string_view CONFLICT_RESOLUTION_STRATEGY_REPLACE = "replace";
- static constexpr std::string_view CONFLICT_RESOLUTION_STRATEGY_IGNORE = "ignore";
- static constexpr std::string_view CONFLICT_RESOLUTION_STRATEGY_FAIL = "fail";
-
explicit PutFile(std::string_view name, const utils::Identifier& uuid = {})
: core::Processor(name, uuid) {
}
~PutFile() override = default;
+ enum class FileExistsResolutionStrategy {
+ fail,
+ replace,
+ ignore
+ };
+
EXTENSIONAPI static constexpr const char* Description = "Writes the contents of a FlowFile to the local file system";
#ifndef WIN32
@@ -66,10 +66,10 @@
.supportsExpressionLanguage(true)
.withDefaultValue(".")
.build();
- EXTENSIONAPI static constexpr auto ConflictResolution = core::PropertyDefinitionBuilder<3>::createProperty("Conflict Resolution Strategy")
+ EXTENSIONAPI static constexpr auto ConflictResolution = core::PropertyDefinitionBuilder<magic_enum::enum_count<FileExistsResolutionStrategy>()>::createProperty("Conflict Resolution Strategy")
.withDescription("Indicates what should happen when a file with the same name already exists in the output directory")
- .withAllowedValues({CONFLICT_RESOLUTION_STRATEGY_FAIL, CONFLICT_RESOLUTION_STRATEGY_IGNORE, CONFLICT_RESOLUTION_STRATEGY_REPLACE})
- .withDefaultValue(CONFLICT_RESOLUTION_STRATEGY_FAIL)
+ .withDefaultValue(magic_enum::enum_name(FileExistsResolutionStrategy::fail))
+ .withAllowedValues(magic_enum::enum_names<FileExistsResolutionStrategy>())
.build();
EXTENSIONAPI static constexpr auto CreateDirs = core::PropertyDefinitionBuilder<0, 1>::createProperty("Create Missing Directories")
.withDescription("If true, then missing destination directories will be created. If false, flowfiles are penalized and sent to failure.")
@@ -111,38 +111,15 @@
void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
void initialize() override;
- class ReadCallback {
- public:
- ReadCallback(std::filesystem::path tmp_file, std::filesystem::path dest_file);
- ~ReadCallback();
- int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
- bool commit();
-
- private:
- std::shared_ptr<core::logging::Logger> logger_{ core::logging::LoggerFactory<PutFile::ReadCallback>::getLogger() };
- bool write_succeeded_ = false;
- std::filesystem::path tmp_file_;
- std::filesystem::path dest_file_;
- };
-
- /**
- * Generate a safe (universally-unique) temporary filename on the same partition
- *
- * @param filename from which to generate temporary write file path
- * @return
- */
- static std::filesystem::path tmpWritePath(const std::filesystem::path& filename, const std::filesystem::path& directory);
-
private:
- std::string conflict_resolution_;
+ FileExistsResolutionStrategy conflict_resolution_strategy_ = FileExistsResolutionStrategy::fail;
bool try_mkdirs_ = true;
- int64_t max_dest_files_ = -1;
+ std::optional<uint64_t> max_dest_files_ = std::nullopt;
- bool putFile(core::ProcessSession *session,
- const std::shared_ptr<core::FlowFile>& flowFile,
- const std::filesystem::path& tmpFile,
- const std::filesystem::path& destFile,
- const std::filesystem::path& destDir);
+ void prepareDirectory(const std::filesystem::path& directory_path) const;
+ bool directoryIsFull(const std::filesystem::path& directory) const;
+ std::optional<std::filesystem::path> getDestinationPath(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file);
+ void putFile(core::ProcessSession& session, const std::shared_ptr<core::FlowFile>& flow_file, const std::filesystem::path& dest_file);
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PutFile>::getLogger(uuid_);
static std::shared_ptr<utils::IdGenerator> id_generator_;
diff --git a/extensions/standard-processors/tests/unit/PutFileTests.cpp b/extensions/standard-processors/tests/unit/PutFileTests.cpp
index d793ac6..da30d49 100644
--- a/extensions/standard-processors/tests/unit/PutFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutFileTests.cpp
@@ -49,7 +49,6 @@
LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
LogTestController::getInstance().setDebug<TestPlan>();
LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
- LogTestController::getInstance().setDebug<minifi::processors::PutFile::ReadCallback>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
@@ -289,12 +288,6 @@
LogTestController::getInstance().reset();
}
-TEST_CASE("Test generation of temporary write path", "[putfileTmpWritePath]") {
- auto processor = std::make_shared<org::apache::nifi::minifi::processors::PutFile>("processorname");
- std::filesystem::path path = std::filesystem::path("a") / std::string("b") / "";
- CHECK(processor->tmpWritePath(path, "").string().starts_with(path.string()));
-}
-
TEST_CASE("PutFileMaxFileCountTest", "[getfileputpfilemaxcount]") {
TestController testController;
@@ -436,7 +429,6 @@
LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
LogTestController::getInstance().setDebug<TestPlan>();
LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
- LogTestController::getInstance().setDebug<minifi::processors::PutFile::ReadCallback>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
@@ -479,8 +471,6 @@
REQUIRE_FALSE(utils::file::exists(putfiledir));
REQUIRE_FALSE(utils::file::exists(path));
- std::string check = "Failed to create empty file: " + path.string();
- REQUIRE(LogTestController::getInstance().contains(check));
}
SECTION("with a non-empty file and create directory property set to true") {
@@ -512,8 +502,6 @@
REQUIRE_FALSE(utils::file::exists(putfiledir));
REQUIRE_FALSE(utils::file::exists(path));
- std::string check = "PutFile commit put file operation to " + path.string() + " failed because write failed";
- REQUIRE(LogTestController::getInstance().contains(check));
}
}
diff --git a/extensions/windows-event-log/wel/MetadataWalker.cpp b/extensions/windows-event-log/wel/MetadataWalker.cpp
index 11c2fc7..d0942c4 100644
--- a/extensions/windows-event-log/wel/MetadataWalker.cpp
+++ b/extensions/windows-event-log/wel/MetadataWalker.cpp
@@ -159,10 +159,6 @@
return replaced_identifiers_;
}
-std::string MetadataWalker::to_string(const wchar_t* pChar) {
- return std::wstring_convert<std::codecvt_utf8<wchar_t>>().to_bytes(pChar);
-}
-
template<typename Fn>
requires std::is_convertible_v<std::invoke_result_t<Fn, std::string>, std::string>
void MetadataWalker::updateText(pugi::xml_node &node, const std::string &field_name, Fn &&fn) {
diff --git a/libminifi/include/c2/triggers/FileUpdateTrigger.h b/libminifi/include/c2/triggers/FileUpdateTrigger.h
index 7458410..1c75325 100644
--- a/libminifi/include/c2/triggers/FileUpdateTrigger.h
+++ b/libminifi/include/c2/triggers/FileUpdateTrigger.h
@@ -103,9 +103,9 @@
return true;
}
- std::optional<std::filesystem::file_time_type> getLastUpdate() const;
+ std::optional<std::chrono::file_clock::time_point> getLastUpdate() const;
- void setLastUpdate(const std::optional<std::filesystem::file_time_type> &last_update);
+ void setLastUpdate(const std::optional<std::chrono::file_clock::time_point> &last_update);
protected:
std::string file_;
@@ -114,7 +114,7 @@
private:
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<FileUpdateTrigger>::getLogger();
mutable std::mutex last_update_lock;
- std::optional<std::filesystem::file_time_type> last_update_;
+ std::optional<std::chrono::file_clock::time_point> last_update_;
};
} // namespace org::apache::nifi::minifi::c2
diff --git a/libminifi/include/core/logging/Logger.h b/libminifi/include/core/logging/Logger.h
index fc0c710..e35d155 100644
--- a/libminifi/include/core/logging/Logger.h
+++ b/libminifi/include/core/logging/Logger.h
@@ -153,6 +153,16 @@
Logger(Logger const&) = delete;
Logger& operator=(Logger const&) = delete;
+
+ /**
+ * @brief Log critical message
+ * @param format format string ('man printf' for syntax)
+ * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match
+ */
+ template<typename ...Args>
+ void log_critical(const char * const format, Args&& ...args) {
+ log(spdlog::level::critical, format, std::forward<Args>(args)...);
+ }
/**
* @brief Log error message
* @param format format string ('man printf' for syntax)
@@ -252,4 +262,5 @@
#define LOG_WARN(x) LogBuilder((x).get(), org::apache::nifi::minifi::core::logging::LOG_LEVEL::warn)
+#define LOG_CRITICAL(x) LogBuilder((x).get(), org::apache::nifi::minifi::core::logging::LOG_LEVEL::critical)
} // namespace org::apache::nifi::minifi::core::logging
diff --git a/libminifi/include/utils/LogUtils.h b/libminifi/include/utils/LogUtils.h
index 3244653..6dc8df0 100644
--- a/libminifi/include/utils/LogUtils.h
+++ b/libminifi/include/utils/LogUtils.h
@@ -29,6 +29,7 @@
LOGGING_INFO,
LOGGING_WARN,
LOGGING_ERROR,
+ LOGGING_CRITICAL,
LOGGING_OFF
};
@@ -50,6 +51,9 @@
case LogLevelOption::LOGGING_ERROR:
logger->log_error(std::forward<Args>(args)...);
break;
+ case LogLevelOption::LOGGING_CRITICAL:
+ logger->log_critical(std::forward<Args>(args)...);
+ break;
case LogLevelOption::LOGGING_OFF:
default:
break;
@@ -74,6 +78,8 @@
return "WARN";
case LogLevelOption::LOGGING_ERROR:
return "ERROR";
+ case LogLevelOption::LOGGING_CRITICAL:
+ return "CRITICAL";
case LogLevelOption::LOGGING_OFF:
return "OFF";
}
diff --git a/libminifi/include/utils/OsUtils.h b/libminifi/include/utils/OsUtils.h
index 8378067..0f73052 100644
--- a/libminifi/include/utils/OsUtils.h
+++ b/libminifi/include/utils/OsUtils.h
@@ -51,6 +51,10 @@
#ifdef WIN32
/// Resolves common identifiers
extern std::string resolve_common_identifiers(const std::string &id);
+
+std::wstring stringToWideString(const std::string& string);
+
+std::string wideStringToString(const std::wstring& wide_string);
#endif
std::optional<std::string> getHostName();
diff --git a/libminifi/include/utils/FileReaderCallback.h b/libminifi/include/utils/file/FileReaderCallback.h
similarity index 100%
rename from libminifi/include/utils/FileReaderCallback.h
rename to libminifi/include/utils/file/FileReaderCallback.h
diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h
index 98a0af5..440f578 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -16,6 +16,7 @@
*/
#pragma once
+#include <chrono>
#include <filesystem>
#include <fstream>
#include <memory>
@@ -72,6 +73,7 @@
#include "core/logging/LoggerFactory.h"
#include "utils/StringUtils.h"
+#include "utils/expected.h"
#include "utils/file/PathUtils.h"
#include "utils/gsl.h"
@@ -84,9 +86,9 @@
namespace FileUtils = ::org::apache::nifi::minifi::utils::file;
-std::chrono::system_clock::time_point to_sys(std::filesystem::file_time_type file_time);
+std::chrono::system_clock::time_point to_sys(std::chrono::file_clock::time_point file_time);
-std::filesystem::file_time_type from_sys(std::chrono::system_clock::time_point sys_time);
+std::chrono::file_clock::time_point from_sys(std::chrono::system_clock::time_point sys_time);
inline int64_t delete_dir(const std::filesystem::path& path, bool delete_files_recursively = true) {
// Empty path is interpreted as the root of the current partition on Windows, which should not be allowed
@@ -118,7 +120,7 @@
return std::chrono::time_point<std::chrono::file_clock, std::chrono::seconds>{};
}
-inline std::optional<std::filesystem::file_time_type> last_write_time(const std::filesystem::path& path) {
+inline std::optional<std::chrono::file_clock::time_point> last_write_time(const std::filesystem::path& path) {
std::error_code ec;
auto result = std::filesystem::last_write_time(path, ec);
if (ec.value() == 0) {
@@ -127,10 +129,12 @@
return std::nullopt;
}
-inline bool set_last_write_time(const std::filesystem::path& path, std::filesystem::file_time_type new_time) {
+inline nonstd::expected<void, std::error_code> set_last_write_time(const std::filesystem::path& path, std::chrono::file_clock::time_point new_time) {
std::error_code ec;
std::filesystem::last_write_time(path, new_time, ec);
- return ec.value() == 0;
+ if (ec)
+ return nonstd::make_unexpected(ec);
+ return {};
}
inline uint64_t file_size(const std::filesystem::path& path) {
@@ -570,4 +574,19 @@
return std::filesystem::relative(path, base_path);
}
+inline size_t countNumberOfFiles(const std::filesystem::path& path) {
+ using std::filesystem::directory_iterator;
+ return std::count_if(directory_iterator(path), directory_iterator(), [](const auto& entry) { return entry.is_regular_file(); });
+}
+
+#ifdef WIN32
+struct WindowsFileTimes {
+ std::chrono::file_clock::time_point creation_time;
+ std::chrono::file_clock::time_point last_access_time;
+ std::chrono::file_clock::time_point last_write_time;
+};
+
+nonstd::expected<WindowsFileTimes, std::error_code> getWindowsFileTimes(const std::filesystem::path& path);
+#endif
+
} // namespace org::apache::nifi::minifi::utils::file
diff --git a/libminifi/include/utils/file/FileWriterCallback.h b/libminifi/include/utils/file/FileWriterCallback.h
new file mode 100644
index 0000000..c284d9a
--- /dev/null
+++ b/libminifi/include/utils/file/FileWriterCallback.h
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <filesystem>
+#include "io/StreamPipe.h"
+#include "utils/expected.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+class FileWriterCallback {
+ public:
+ explicit FileWriterCallback(std::filesystem::path dest_path);
+ ~FileWriterCallback();
+ int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
+ bool commit();
+
+
+ private:
+ bool write_succeeded_ = false;
+ std::filesystem::path temp_path_;
+ std::filesystem::path dest_path_;
+};
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/file/ListedFile.h b/libminifi/include/utils/file/ListedFile.h
new file mode 100644
index 0000000..9961f7e
--- /dev/null
+++ b/libminifi/include/utils/file/ListedFile.h
@@ -0,0 +1,104 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <utility>
+#include <string>
+
+#include "../ListingStateManager.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+struct FileFilter {
+ std::optional<std::regex> filename_filter;
+ std::optional<std::regex> path_filter;
+ std::optional<std::chrono::milliseconds> minimum_file_age;
+ std::optional<std::chrono::milliseconds> maximum_file_age;
+ std::optional<uint64_t> minimum_file_size;
+ std::optional<uint64_t> maximum_file_size;
+ bool ignore_hidden_files = true;
+};
+
+class ListedFile : public utils::ListedObject {
+ public:
+ explicit ListedFile(std::filesystem::path full_file_path, std::filesystem::path input_directory) : full_file_path_(std::move(full_file_path)), input_directory_(std::move(input_directory)) {
+ if (auto last_write_time = utils::file::last_write_time(full_file_path_)) {
+ last_modified_time_ = utils::file::to_sys(*last_write_time);
+ }
+ }
+
+ [[nodiscard]] std::chrono::system_clock::time_point getLastModified() const override {
+ return std::chrono::time_point_cast<std::chrono::milliseconds>(last_modified_time_);
+ }
+
+ [[nodiscard]] std::string getKey() const override {
+ return full_file_path_.string();
+ }
+
+ [[nodiscard]] const std::filesystem::path& getPath() const {
+ return full_file_path_;
+ }
+
+ [[nodiscard]] const std::filesystem::path& getDirectory() const {
+ return input_directory_;
+ }
+
+ [[nodiscard]] bool matches(const FileFilter& file_filter) {
+ if (file_filter.ignore_hidden_files && utils::file::FileUtils::is_hidden(full_file_path_))
+ return false;
+
+ return fileAgeIsBetween(file_filter.minimum_file_age, file_filter.maximum_file_age) &&
+ fileSizeIsBetween(file_filter.minimum_file_size, file_filter.maximum_file_size) &&
+ matchesRegex(file_filter.filename_filter, file_filter.path_filter);
+ }
+
+ private:
+ [[nodiscard]] bool matchesRegex(const std::optional<std::regex>& file_regex, const std::optional<std::regex>& path_regex) const {
+ if (file_regex && !std::regex_match(full_file_path_.filename().string(), *file_regex))
+ return false;
+ if (path_regex && !std::regex_match(std::filesystem::relative(full_file_path_.parent_path(), input_directory_).string(), *path_regex))
+ return false;
+ return true;
+ }
+
+ [[nodiscard]] bool fileAgeIsBetween(const std::optional<std::chrono::milliseconds> minimum_age, const std::optional<std::chrono::milliseconds> maximum_age) const {
+ auto file_age = getAge();
+ if (minimum_age && minimum_age > file_age)
+ return false;
+ if (maximum_age && maximum_age < file_age)
+ return false;
+ return true;
+ }
+
+ [[nodiscard]] bool fileSizeIsBetween(const std::optional<size_t> minimum_size, const std::optional<size_t> maximum_size) const {
+ if (minimum_size && minimum_size > getSize())
+ return false;
+ if (maximum_size && maximum_size < getSize())
+ return false;
+ return true;
+ }
+
+ [[nodiscard]] std::chrono::system_clock::duration getAge() const { return std::chrono::system_clock::now() - last_modified_time_;}
+ [[nodiscard]] size_t getSize() const { return utils::file::file_size(full_file_path_); }
+ std::chrono::system_clock::time_point last_modified_time_;
+ std::filesystem::path full_file_path_;
+ std::filesystem::path input_directory_;
+};
+
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/c2/triggers/FileUpdateTrigger.cpp b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp
index 3ed74df..c3e94da 100644
--- a/libminifi/src/c2/triggers/FileUpdateTrigger.cpp
+++ b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp
@@ -39,12 +39,12 @@
return response_payload;
}
-std::optional<std::filesystem::file_time_type> FileUpdateTrigger::getLastUpdate() const {
+std::optional<std::chrono::file_clock::time_point> FileUpdateTrigger::getLastUpdate() const {
std::lock_guard<std::mutex> lock(last_update_lock);
return last_update_;
}
-void FileUpdateTrigger::setLastUpdate(const std::optional<std::filesystem::file_time_type> &last_update) {
+void FileUpdateTrigger::setLastUpdate(const std::optional<std::chrono::file_clock::time_point> &last_update) {
std::lock_guard<std::mutex> lock(last_update_lock);
last_update_ = last_update;
}
diff --git a/libminifi/src/core/logging/Logger.cpp b/libminifi/src/core/logging/Logger.cpp
index 8a75189..aafd820 100644
--- a/libminifi/src/core/logging/Logger.cpp
+++ b/libminifi/src/core/logging/Logger.cpp
@@ -100,7 +100,7 @@
void Logger::log_string(LOG_LEVEL level, std::string str) {
switch (level) {
case critical:
- log_warn(str.c_str());
+ log_critical(str.c_str());
break;
case err:
log_error(str.c_str());
diff --git a/libminifi/src/utils/NetworkInterfaceInfo.cpp b/libminifi/src/utils/NetworkInterfaceInfo.cpp
index fc9dc57..e86bb32 100644
--- a/libminifi/src/utils/NetworkInterfaceInfo.cpp
+++ b/libminifi/src/utils/NetworkInterfaceInfo.cpp
@@ -18,11 +18,9 @@
#include "utils/net/Socket.h"
#include "core/logging/LoggerConfiguration.h"
#ifdef WIN32
-#include <Windows.h>
-#include <winsock2.h>
#include <iphlpapi.h>
-#include <WS2tcpip.h>
#pragma comment(lib, "IPHLPAPI.lib")
+#include "utils/OsUtils.h"
#else
#include <unistd.h>
#include <netinet/in.h>
@@ -37,19 +35,9 @@
std::shared_ptr<core::logging::Logger> NetworkInterfaceInfo::logger_ = core::logging::LoggerFactory<NetworkInterfaceInfo>::getLogger();
#ifdef WIN32
-namespace {
-std::string utf8_encode(const std::wstring& wstr) {
- if (wstr.empty())
- return std::string();
- int size_needed = WideCharToMultiByte(CP_UTF8, 0, wstr.c_str(), -1, nullptr, 0, nullptr, nullptr);
- std::string result_string(size_needed, 0);
- WideCharToMultiByte(CP_UTF8, 0, wstr.c_str(), -1, result_string.data(), size_needed, nullptr, nullptr);
- return result_string;
-}
-}
NetworkInterfaceInfo::NetworkInterfaceInfo(const IP_ADAPTER_ADDRESSES* adapter) {
- name_ = utf8_encode(adapter->FriendlyName);
+ name_ = OsUtils::wideStringToString(adapter->FriendlyName);
for (auto unicast_address = adapter->FirstUnicastAddress; unicast_address != nullptr; unicast_address = unicast_address->Next) {
if (unicast_address->Address.lpSockaddr->sa_family == AF_INET) {
ip_v4_addresses_.push_back(net::sockaddr_ntop(unicast_address->Address.lpSockaddr));
@@ -94,7 +82,7 @@
return network_adapters;
}
std::vector<char> bytes(buffer_length, 0);
- IP_ADAPTER_ADDRESSES* adapter = reinterpret_cast<IP_ADAPTER_ADDRESSES*>(bytes.data());
+ auto* adapter = reinterpret_cast<IP_ADAPTER_ADDRESSES*>(bytes.data());
get_adapters_err = GetAdaptersAddresses(0, 0, nullptr, adapter, &buffer_length);
if (NO_ERROR != get_adapters_err) {
logger_->log_error("GetAdaptersAddresses failed: %lu", get_adapters_err);
diff --git a/libminifi/src/utils/OsUtils.cpp b/libminifi/src/utils/OsUtils.cpp
index e1c03a9..f865000 100644
--- a/libminifi/src/utils/OsUtils.cpp
+++ b/libminifi/src/utils/OsUtils.cpp
@@ -21,6 +21,7 @@
#include <iostream>
#include <map>
+#include "fmt/format.h"
#include "utils/gsl.h"
#include "Exception.h"
@@ -89,31 +90,32 @@
name = uid;
if (!name.empty()) {
#ifdef _WIN32
- const auto resolved_name = resolve_common_identifiers(name);
+ auto resolved_name = resolve_common_identifiers(name);
if (!resolved_name.empty()) {
return resolved_name;
}
// First call to LookupAccountSid to get the buffer sizes.
- PSID pSidOwner = NULL;
- const auto guard_pSidOwner = gsl::finally([&pSidOwner]() { if (pSidOwner != NULL) { LocalFree(pSidOwner); } });
+ PSID pSidOwner = nullptr;
+ const auto guard_pSidOwner = gsl::finally([&pSidOwner]() { if (pSidOwner != nullptr) { LocalFree(pSidOwner); } });
if (ConvertStringSidToSidA(name.c_str(), &pSidOwner)) {
SID_NAME_USE sidType = SidTypeUnknown;
- DWORD windowsAccountNameSize = 0, dwwindowsDomainSize = 0;
+ DWORD windowsAccountNameSize = 0;
+ DWORD dwwindowsDomainSize = 0;
/*
We can use a unique ptr with a deleter here but some of the calls
below require we use global alloc -- so a global deleter to call GlobalFree
won't buy us a ton unless we anticipate requiring more of this. If we do
I suggest we break this out until a subset of OsUtils into our own convenience functions.
*/
- LPTSTR windowsDomain = NULL;
- LPTSTR windowsAccount = NULL;
+ LPTSTR windowsDomain = nullptr;
+ LPTSTR windowsAccount = nullptr;
/*
The first call will be to obtain sizes for domain and account,
after which we will allocate the memory and free it after.
In some cases youc an replace GlobalAlloc with
*/
- LookupAccountSid(NULL /** local computer **/, pSidOwner,
+ LookupAccountSid(nullptr /** local computer **/, pSidOwner,
windowsAccount,
(LPDWORD)&windowsAccountNameSize,
windowsDomain,
@@ -132,7 +134,7 @@
}
if (LookupAccountSid(
- NULL,
+ nullptr,
pSidOwner,
windowsAccount,
(LPDWORD)&windowsAccountNameSize,
@@ -337,6 +339,36 @@
return {hostname};
}
+#ifdef WIN32
+std::wstring OsUtils::stringToWideString(const std::string& string) {
+ if (string.empty())
+ return {};
+
+ const auto size_needed = MultiByteToWideChar(CP_UTF8, 0, &string.at(0), gsl::narrow<int>(string.size()), nullptr, 0);
+ if (size_needed <= 0) {
+ throw std::runtime_error(fmt::format("MultiByteToWideChar() returned: {}, due to {}", std::to_string(size_needed), utils::OsUtils::windowsErrorToErrorCode(GetLastError()).message()));
+ }
+
+ std::wstring result(size_needed, L'\0');
+ MultiByteToWideChar(CP_UTF8, 0, &string.at(0), gsl::narrow<int>(string.size()), &result.at(0), size_needed);
+ return result;
+}
+
+std::string OsUtils::wideStringToString(const std::wstring& wide_string) {
+ if (wide_string.empty())
+ return {};
+
+ const auto size_needed = WideCharToMultiByte(CP_UTF8, 0, &wide_string.at(0), gsl::narrow<int>(wide_string.size()), nullptr, 0, nullptr, nullptr);
+ if (size_needed <= 0) {
+ throw std::runtime_error(fmt::format("WideCharToMultiByte() returned: {}, due to {}", std::to_string(size_needed), utils::OsUtils::windowsErrorToErrorCode(GetLastError()).message()));
+ }
+
+ std::string result(size_needed, 0);
+ WideCharToMultiByte(CP_UTF8, 0, &wide_string.at(0), gsl::narrow<int>(wide_string.size()), &result.at(0), size_needed, nullptr, nullptr);
+ return result;
+}
+#endif
+
std::optional<double> OsUtils::getSystemLoadAverage() {
#ifndef WIN32
double load_avg[1];
diff --git a/libminifi/src/utils/FileReaderCallback.cpp b/libminifi/src/utils/file/FileReaderCallback.cpp
similarity index 97%
rename from libminifi/src/utils/FileReaderCallback.cpp
rename to libminifi/src/utils/file/FileReaderCallback.cpp
index aa4fb6c..cbfec6c 100644
--- a/libminifi/src/utils/FileReaderCallback.cpp
+++ b/libminifi/src/utils/file/FileReaderCallback.cpp
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "utils/FileReaderCallback.h"
+#include "utils/file/FileReaderCallback.h"
#include <cinttypes>
#include <cstring>
diff --git a/libminifi/src/utils/file/FileUtils.cpp b/libminifi/src/utils/file/FileUtils.cpp
index 79f8df9..ef22dcb 100644
--- a/libminifi/src/utils/file/FileUtils.cpp
+++ b/libminifi/src/utils/file/FileUtils.cpp
@@ -25,6 +25,10 @@
#include "utils/Literals.h"
#include "utils/Searcher.h"
+#ifdef WIN32
+#include "utils/OsUtils.h"
+#endif
+
namespace org::apache::nifi::minifi::utils::file {
uint64_t computeChecksum(const std::filesystem::path& file_name, uint64_t up_to_position) {
@@ -66,7 +70,7 @@
return std::search(view.begin(), view.end(), searcher) != view.end();
}
-std::chrono::system_clock::time_point to_sys(std::filesystem::file_time_type file_time) {
+std::chrono::system_clock::time_point to_sys(std::chrono::file_clock::time_point file_time) {
using namespace std::chrono; // NOLINT(build/namespaces)
#if defined(WIN32)
// workaround for https://github.com/microsoft/STL/issues/2446
@@ -81,7 +85,7 @@
#endif
}
-std::filesystem::file_time_type from_sys(std::chrono::system_clock::time_point sys_time) {
+std::chrono::file_clock::time_point from_sys(std::chrono::system_clock::time_point sys_time) {
using namespace std::chrono; // NOLINT(build/namespaces)
#if defined(WIN32)
// workaround for https://github.com/microsoft/STL/issues/2446
@@ -96,4 +100,23 @@
#endif
}
+#ifdef WIN32
+std::chrono::file_clock::time_point fileTimePointFromFileTime(const FILETIME& filetime) {
+ // FILETIME contains a 64-bit value representing the number of 100-nanosecond intervals since January 1, 1601 (UTC).
+ static_assert(std::ratio_equal_v<std::chrono::file_clock::duration::period, std::ratio<1, 10000000>>, "file_clock duration tick period must be 100 nanoseconds");
+ std::chrono::file_clock::duration duration{(static_cast<int64_t>(filetime.dwHighDateTime) << 32) | filetime.dwLowDateTime};
+ return std::chrono::file_clock::time_point{duration};
+}
+
+nonstd::expected<WindowsFileTimes, std::error_code> getWindowsFileTimes(const std::filesystem::path& path) {
+ WIN32_FILE_ATTRIBUTE_DATA file_attributes;
+ auto get_file_attributes_result = GetFileAttributesExW(path.c_str(), GetFileExInfoStandard, &file_attributes);
+ if (!get_file_attributes_result)
+ return nonstd::make_unexpected(utils::OsUtils::windowsErrorToErrorCode(GetLastError()));
+ return WindowsFileTimes{.creation_time = fileTimePointFromFileTime(file_attributes.ftCreationTime),
+ .last_access_time = fileTimePointFromFileTime(file_attributes.ftLastAccessTime),
+ .last_write_time = fileTimePointFromFileTime(file_attributes.ftLastWriteTime)};
+}
+#endif // WIN32
+
} // namespace org::apache::nifi::minifi::utils::file
diff --git a/libminifi/src/utils/file/FileWriterCallback.cpp b/libminifi/src/utils/file/FileWriterCallback.cpp
new file mode 100644
index 0000000..3a1542e
--- /dev/null
+++ b/libminifi/src/utils/file/FileWriterCallback.cpp
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/file/FileWriterCallback.h"
+#include <fstream>
+
+namespace org::apache::nifi::minifi::utils {
+
+FileWriterCallback::FileWriterCallback(std::filesystem::path dest_path)
+ : dest_path_(std::move(dest_path)) {
+ auto new_filename = std::filesystem::path("." + dest_path_.filename().string() + "." + utils::IdGenerator::getIdGenerator()->generate().to_string());
+ temp_path_ = dest_path_.parent_path() / new_filename;
+}
+
+FileWriterCallback::~FileWriterCallback() {
+ std::error_code remove_error;
+ std::filesystem::remove(temp_path_, remove_error);
+}
+
+int64_t FileWriterCallback::operator()(const std::shared_ptr<io::InputStream>& stream) {
+ write_succeeded_ = false;
+ size_t size = 0;
+ std::array<std::byte, 1024> buffer{};
+
+ std::ofstream tmp_file_os(temp_path_, std::ios::out | std::ios::binary);
+
+ do {
+ const auto read = stream->read(buffer);
+ if (io::isError(read)) return -1;
+ if (read == 0) break;
+ tmp_file_os.write(reinterpret_cast<char *>(buffer.data()), gsl::narrow<std::streamsize>(read));
+ size += read;
+ } while (size < stream->size());
+
+ tmp_file_os.close();
+
+ if (tmp_file_os) {
+ write_succeeded_ = true;
+ }
+
+ return gsl::narrow<int64_t>(size);
+}
+
+bool FileWriterCallback::commit() {
+ if (!write_succeeded_)
+ return false;
+
+ std::error_code rename_error;
+ std::filesystem::rename(temp_path_, dest_path_, rename_error);
+ return !rename_error;
+}
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h
index b3ed118..5fbc581 100644
--- a/libminifi/test/Utils.h
+++ b/libminifi/test/Utils.h
@@ -60,7 +60,7 @@
} else if (expected.IsArray()) {
REQUIRE(actual.IsArray());
REQUIRE(actual.Size() == expected.Size());
- for (size_t idx{0}; idx < expected.Size(); ++idx) {
+ for (rapidjson::SizeType idx{0}; idx < expected.Size(); ++idx) {
matchJSON(actual[idx], expected[idx]);
}
} else {
diff --git a/libminifi/test/archive-tests/FocusArchiveTests.cpp b/libminifi/test/archive-tests/FocusArchiveTests.cpp
index 51d8a7f..9ea80b8 100644
--- a/libminifi/test/archive-tests/FocusArchiveTests.cpp
+++ b/libminifi/test/archive-tests/FocusArchiveTests.cpp
@@ -45,86 +45,87 @@
const char* FOCUSED_FILE = FILE_NAMES[0];
const char* FOCUSED_CONTENT = FILE_CONTENT[0];
+namespace org::apache::nifi::minifi::processors::test {
TEST_CASE("Test Creation of FocusArchiveEntry", "[getfileCreate]") {
TestController testController;
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::FocusArchiveEntry>("processorname");
+ std::shared_ptr<core::Processor> processor = std::make_shared<FocusArchiveEntry>("processorname");
REQUIRE(processor->getName() == "processorname");
}
TEST_CASE("Test Creation of UnfocusArchiveEntry", "[getfileCreate]") {
- TestController testController;
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::UnfocusArchiveEntry>("processorname");
- REQUIRE(processor->getName() == "processorname");
- REQUIRE(processor->getUUID());
+ TestController testController;
+ std::shared_ptr<core::Processor> processor = std::make_shared<UnfocusArchiveEntry>("processorname");
+ REQUIRE(processor->getName() == "processorname");
+ REQUIRE(processor->getUUID());
}
TEST_CASE("FocusArchive", "[testFocusArchive]") {
- TestController testController;
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::FocusArchiveEntry>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::UnfocusArchiveEntry>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::PutFile>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::GetFile>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
- LogTestController::getInstance().setTrace<core::ProcessSession>();
- LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::FlowFile>();
+ TestController testController;
+ LogTestController::getInstance().setTrace<FocusArchiveEntry>();
+ LogTestController::getInstance().setTrace<UnfocusArchiveEntry>();
+ LogTestController::getInstance().setTrace<PutFile>();
+ LogTestController::getInstance().setTrace<GetFile>();
+ LogTestController::getInstance().setTrace<LogAttribute>();
+ LogTestController::getInstance().setTrace<core::ProcessSession>();
+ LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+ LogTestController::getInstance().setTrace<Connection>();
+ LogTestController::getInstance().setTrace<core::Connectable>();
+ LogTestController::getInstance().setTrace<core::FlowFile>();
- std::shared_ptr<TestPlan> plan = testController.createPlan();
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
- auto dir1 = testController.createTempDirectory();
- auto dir2 = testController.createTempDirectory();
- auto dir3 = testController.createTempDirectory();
+ auto dir1 = testController.createTempDirectory();
+ auto dir2 = testController.createTempDirectory();
+ auto dir3 = testController.createTempDirectory();
- REQUIRE(!dir1.empty());
- REQUIRE(!dir2.empty());
- REQUIRE(!dir3.empty());
- std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
- plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory, dir1.string());
- plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::KeepSourceFile, "true");
+ REQUIRE(!dir1.empty());
+ REQUIRE(!dir2.empty());
+ REQUIRE(!dir3.empty());
+ std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
+ plan->setProperty(getfile, GetFile::Directory, dir1.string());
+ plan->setProperty(getfile, GetFile::KeepSourceFile, "true");
- std::shared_ptr<core::Processor> fprocessor = plan->addProcessor("FocusArchiveEntry", "focusarchiveCreate", core::Relationship("success", "description"), true);
- plan->setProperty(fprocessor, org::apache::nifi::minifi::processors::FocusArchiveEntry::Path, FOCUSED_FILE);
+ std::shared_ptr<core::Processor> fprocessor = plan->addProcessor("FocusArchiveEntry", "focusarchiveCreate", core::Relationship("success", "description"), true);
+ plan->setProperty(fprocessor, FocusArchiveEntry::Path, FOCUSED_FILE);
- std::shared_ptr<core::Processor> putfile1 = plan->addProcessor("PutFile", "PutFile1", core::Relationship("success", "description"), true);
- plan->setProperty(putfile1, org::apache::nifi::minifi::processors::PutFile::Directory, dir2.string());
- plan->setProperty(putfile1, org::apache::nifi::minifi::processors::PutFile::ConflictResolution,
- org::apache::nifi::minifi::processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE);
+ std::shared_ptr<core::Processor> putfile1 = plan->addProcessor("PutFile", "PutFile1", core::Relationship("success", "description"), true);
+ plan->setProperty(putfile1, PutFile::Directory, dir2.string());
+ plan->setProperty(putfile1, PutFile::ConflictResolution, magic_enum::enum_name(PutFile::FileExistsResolutionStrategy::replace));
- std::shared_ptr<core::Processor> ufprocessor = plan->addProcessor("UnfocusArchiveEntry", "unfocusarchiveCreate", core::Relationship("success", "description"), true);
+ std::shared_ptr<core::Processor> ufprocessor = plan->addProcessor("UnfocusArchiveEntry", "unfocusarchiveCreate", core::Relationship("success", "description"), true);
- std::shared_ptr<core::Processor> putfile2 = plan->addProcessor("PutFile", "PutFile2", core::Relationship("success", "description"), true);
- plan->setProperty(putfile2, org::apache::nifi::minifi::processors::PutFile::Directory, dir3.string());
- plan->setProperty(putfile2, org::apache::nifi::minifi::processors::PutFile::ConflictResolution,
- org::apache::nifi::minifi::processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE);
+ std::shared_ptr<core::Processor> putfile2 = plan->addProcessor("PutFile", "PutFile2", core::Relationship("success", "description"), true);
+ plan->setProperty(putfile2, PutFile::Directory, dir3.string());
+ plan->setProperty(putfile2, PutFile::ConflictResolution, magic_enum::enum_name(PutFile::FileExistsResolutionStrategy::replace));
- auto archive_path_1 = dir1 / TEST_ARCHIVE_NAME;
+ auto archive_path_1 = dir1 / TEST_ARCHIVE_NAME;
- TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT);
- build_test_archive(archive_path_1, test_archive_map);
+ TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT);
+ build_test_archive(archive_path_1, test_archive_map);
- REQUIRE(check_archive_contents(archive_path_1, test_archive_map));
+ REQUIRE(check_archive_contents(archive_path_1, test_archive_map));
- plan->runNextProcessor(); // GetFile
- plan->runNextProcessor(); // FocusArchive
- plan->runNextProcessor(); // PutFile 1 (focused)
+ plan->runNextProcessor(); // GetFile
+ plan->runNextProcessor(); // FocusArchive
+ plan->runNextProcessor(); // PutFile 1 (focused)
- std::ifstream ifs(dir2 / FOCUSED_FILE, std::ios::in | std::ios::binary | std::ios::ate);
+ std::ifstream ifs(dir2 / FOCUSED_FILE, std::ios::in | std::ios::binary | std::ios::ate);
- auto size = gsl::narrow<size_t>(ifs.tellg());
- ifs.seekg(0, std::ios::beg);
- char *content = new char[size];
- ifs.read(content, size);
+ auto size = gsl::narrow<size_t>(ifs.tellg());
+ ifs.seekg(0, std::ios::beg);
+ char* content = new char[size];
+ ifs.read(content, size);
- REQUIRE(size == strlen(FOCUSED_CONTENT));
- REQUIRE(memcmp(content, FOCUSED_CONTENT, size) == 0);
+ REQUIRE(size == strlen(FOCUSED_CONTENT));
+ REQUIRE(memcmp(content, FOCUSED_CONTENT, size) == 0);
- plan->runNextProcessor(); // UnfocusArchive
- plan->runNextProcessor(); // PutFile 2 (unfocused)
+ plan->runNextProcessor(); // UnfocusArchive
+ plan->runNextProcessor(); // PutFile 2 (unfocused)
- auto archive_path_2 = dir3 / TEST_ARCHIVE_NAME;
- REQUIRE(check_archive_contents(archive_path_2, test_archive_map));
+ auto archive_path_2 = dir3 / TEST_ARCHIVE_NAME;
+ REQUIRE(check_archive_contents(archive_path_2, test_archive_map));
}
+
+} // namespace org::apache::nifi::minifi::processors::test
diff --git a/libminifi/test/archive-tests/ManipulateArchiveTests.cpp b/libminifi/test/archive-tests/ManipulateArchiveTests.cpp
index 0f90567..e4db56c 100644
--- a/libminifi/test/archive-tests/ManipulateArchiveTests.cpp
+++ b/libminifi/test/archive-tests/ManipulateArchiveTests.cpp
@@ -45,55 +45,56 @@
using PROP_MAP_T = std::vector<std::pair<core::PropertyReference, std::string>>;
+namespace org::apache::nifi::minifi::processors::test {
+
bool run_archive_test(OrderedTestArchive& input_archive, const OrderedTestArchive& output_archive, const PROP_MAP_T& properties, bool check_attributes = true) {
- TestController testController;
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::FocusArchiveEntry>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::UnfocusArchiveEntry>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::ManipulateArchive>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::PutFile>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::GetFile>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
- LogTestController::getInstance().setTrace<core::ProcessSession>();
- LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::FlowFile>();
+ TestController testController;
+ LogTestController::getInstance().setTrace<FocusArchiveEntry>();
+ LogTestController::getInstance().setTrace<UnfocusArchiveEntry>();
+ LogTestController::getInstance().setTrace<ManipulateArchive>();
+ LogTestController::getInstance().setTrace<PutFile>();
+ LogTestController::getInstance().setTrace<GetFile>();
+ LogTestController::getInstance().setTrace<LogAttribute>();
+ LogTestController::getInstance().setTrace<core::ProcessSession>();
+ LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
+ LogTestController::getInstance().setTrace<Connection>();
+ LogTestController::getInstance().setTrace<core::Connectable>();
+ LogTestController::getInstance().setTrace<core::FlowFile>();
- std::shared_ptr<TestPlan> plan = testController.createPlan();
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
- auto dir1 = testController.createTempDirectory();
- auto dir2 = testController.createTempDirectory();
+ auto dir1 = testController.createTempDirectory();
+ auto dir2 = testController.createTempDirectory();
- REQUIRE(!dir1.empty());
- REQUIRE(!dir2.empty());
+ REQUIRE(!dir1.empty());
+ REQUIRE(!dir2.empty());
- std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
- plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory, dir1.string());
- plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::KeepSourceFile, "true");
+ std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
+ plan->setProperty(getfile, GetFile::Directory, dir1.string());
+ plan->setProperty(getfile, GetFile::KeepSourceFile, "true");
- std::shared_ptr<core::Processor> maprocessor = plan->addProcessor("ManipulateArchive", "testManipulateArchive", core::Relationship("success", "description"), true);
+ std::shared_ptr<core::Processor> maprocessor = plan->addProcessor("ManipulateArchive", "testManipulateArchive", core::Relationship("success", "description"), true);
- for (const auto& [name, value] : properties) {
- plan->setProperty(maprocessor, name, value);
- }
+ for (const auto& [name, value] : properties) {
+ plan->setProperty(maprocessor, name, value);
+ }
- std::shared_ptr<core::Processor> putfile2 = plan->addProcessor("PutFile", "PutFile2", core::Relationship("success", "description"), true);
- plan->setProperty(putfile2, org::apache::nifi::minifi::processors::PutFile::Directory, dir2.string());
- plan->setProperty(putfile2, org::apache::nifi::minifi::processors::PutFile::ConflictResolution,
- org::apache::nifi::minifi::processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE);
+ std::shared_ptr<core::Processor> putfile2 = plan->addProcessor("PutFile", "PutFile2", core::Relationship("success", "description"), true);
+ plan->setProperty(putfile2, PutFile::Directory, dir2.string());
+ plan->setProperty(putfile2, PutFile::ConflictResolution, magic_enum::enum_name(PutFile::FileExistsResolutionStrategy::replace));
- auto archive_path_1 = dir1 / TEST_ARCHIVE_NAME;
+ auto archive_path_1 = dir1 / TEST_ARCHIVE_NAME;
- build_test_archive(archive_path_1, input_archive);
- REQUIRE(check_archive_contents(archive_path_1, input_archive, true));
+ build_test_archive(archive_path_1, input_archive);
+ REQUIRE(check_archive_contents(archive_path_1, input_archive, true));
- plan->runNextProcessor(); // GetFile
- plan->runNextProcessor(); // ManipulateArchive
- plan->runNextProcessor(); // PutFile 2 (manipulated)
+ plan->runNextProcessor(); // GetFile
+ plan->runNextProcessor(); // ManipulateArchive
+ plan->runNextProcessor(); // PutFile 2 (manipulated)
- auto output_path = dir2 / TEST_ARCHIVE_NAME;
- return check_archive_contents(output_path, output_archive, check_attributes);
+ auto output_path = dir2 / TEST_ARCHIVE_NAME;
+ return check_archive_contents(output_path, output_archive, check_attributes);
}
bool run_archive_test(TAE_MAP_T input_map, TAE_MAP_T output_map, const PROP_MAP_T& properties, bool check_attributes = true) {
@@ -109,235 +110,236 @@
TEST_CASE("Test creation of ManipulateArchive", "[manipulatearchiveCreate]") {
TestController testController;
- std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ManipulateArchive>("processorname");
+ std::shared_ptr<core::Processor> processor = std::make_shared<ManipulateArchive>("processorname");
REQUIRE(processor->getName() == "processorname");
REQUIRE(processor->getUUID());
}
TEST_CASE("Test ManipulateArchive Touch", "[testManipulateArchiveTouch]") {
- TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT);
+ TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT);
- PROP_MAP_T properties {
- {org::apache::nifi::minifi::processors::ManipulateArchive::Destination, MODIFY_DEST},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Operation,
- org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_TOUCH}
- };
+ PROP_MAP_T properties{
+ {ManipulateArchive::Destination, MODIFY_DEST},
+ {ManipulateArchive::Operation,
+ ManipulateArchive::OPERATION_TOUCH}
+ };
- // The other attributes aren't checked, so we can leave them uninitialized
- TestArchiveEntry touched_entry;
- touched_entry.name = MODIFY_DEST;
- touched_entry.content = "";
- touched_entry.size = 0;
- touched_entry.type = AE_IFREG;
+ // The other attributes aren't checked, so we can leave them uninitialized
+ TestArchiveEntry touched_entry;
+ touched_entry.name = MODIFY_DEST;
+ touched_entry.content = "";
+ touched_entry.size = 0;
+ touched_entry.type = AE_IFREG;
- // Copy original map and append touched entry
- TAE_MAP_T mod_archive_map(test_archive_map);
- mod_archive_map[MODIFY_DEST] = touched_entry;
+ // Copy original map and append touched entry
+ TAE_MAP_T mod_archive_map(test_archive_map);
+ mod_archive_map[MODIFY_DEST] = touched_entry;
- REQUIRE(run_archive_test(test_archive_map, mod_archive_map, properties, false));
+ REQUIRE(run_archive_test(test_archive_map, mod_archive_map, properties, false));
}
TEST_CASE("Test ManipulateArchive Copy", "[testManipulateArchiveCopy]") {
- TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT);
+ TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT);
- PROP_MAP_T properties {
- {org::apache::nifi::minifi::processors::ManipulateArchive::Target, MODIFY_SRC},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Destination, MODIFY_DEST},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Operation,
- org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_COPY}
- };
+ PROP_MAP_T properties{
+ {ManipulateArchive::Target, MODIFY_SRC},
+ {ManipulateArchive::Destination, MODIFY_DEST},
+ {ManipulateArchive::Operation,
+ ManipulateArchive::OPERATION_COPY}
+ };
- TAE_MAP_T mod_archive_map(test_archive_map);
- mod_archive_map[MODIFY_DEST] = test_archive_map[MODIFY_SRC];
+ TAE_MAP_T mod_archive_map(test_archive_map);
+ mod_archive_map[MODIFY_DEST] = test_archive_map[MODIFY_SRC];
- REQUIRE(run_archive_test(test_archive_map, mod_archive_map, properties));
+ REQUIRE(run_archive_test(test_archive_map, mod_archive_map, properties));
}
TEST_CASE("Test ManipulateArchive Move", "[testManipulateArchiveMove]") {
- TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT);
+ TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT);
- PROP_MAP_T properties {
- {org::apache::nifi::minifi::processors::ManipulateArchive::Target, MODIFY_SRC},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Destination, MODIFY_DEST},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Operation,
- org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_MOVE}
- };
+ PROP_MAP_T properties{
+ {ManipulateArchive::Target, MODIFY_SRC},
+ {ManipulateArchive::Destination, MODIFY_DEST},
+ {ManipulateArchive::Operation,
+ ManipulateArchive::OPERATION_MOVE}
+ };
- TAE_MAP_T mod_archive_map(test_archive_map);
+ TAE_MAP_T mod_archive_map(test_archive_map);
- mod_archive_map[MODIFY_DEST] = test_archive_map[MODIFY_SRC];
- mod_archive_map[MODIFY_DEST].name = MODIFY_DEST;
+ mod_archive_map[MODIFY_DEST] = test_archive_map[MODIFY_SRC];
+ mod_archive_map[MODIFY_DEST].name = MODIFY_DEST;
- auto it = mod_archive_map.find(MODIFY_SRC);
- mod_archive_map.erase(it);
+ auto it = mod_archive_map.find(MODIFY_SRC);
+ mod_archive_map.erase(it);
- REQUIRE(run_archive_test(test_archive_map, mod_archive_map, properties));
+ REQUIRE(run_archive_test(test_archive_map, mod_archive_map, properties));
}
TEST_CASE("Test ManipulateArchive Remove", "[testManipulateArchiveRemove]") {
- TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT);
+ TAE_MAP_T test_archive_map = build_test_archive_map(NUM_FILES, FILE_NAMES, FILE_CONTENT);
- PROP_MAP_T properties {
- {org::apache::nifi::minifi::processors::ManipulateArchive::Target, MODIFY_SRC},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Operation,
- org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_REMOVE}
- };
+ PROP_MAP_T properties{
+ {ManipulateArchive::Target, MODIFY_SRC},
+ {ManipulateArchive::Operation,
+ ManipulateArchive::OPERATION_REMOVE}
+ };
- TAE_MAP_T mod_archive_map(test_archive_map);
+ TAE_MAP_T mod_archive_map(test_archive_map);
- auto it = mod_archive_map.find(MODIFY_SRC);
- mod_archive_map.erase(it);
+ auto it = mod_archive_map.find(MODIFY_SRC);
+ mod_archive_map.erase(it);
- REQUIRE(run_archive_test(test_archive_map, mod_archive_map, properties));
+ REQUIRE(run_archive_test(test_archive_map, mod_archive_map, properties));
}
TEST_CASE("Test ManipulateArchive Ordered Touch (before)", "[testManipulateArchiveOrderedTouchBefore]") {
- OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT);
+ OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT);
- PROP_MAP_T properties {
- {org::apache::nifi::minifi::processors::ManipulateArchive::Destination, MODIFY_DEST},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Operation,
- org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_TOUCH},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Before, ORDER_ANCHOR}
- };
+ PROP_MAP_T properties{
+ {ManipulateArchive::Destination, MODIFY_DEST},
+ {ManipulateArchive::Operation,
+ ManipulateArchive::OPERATION_TOUCH},
+ {ManipulateArchive::Before, ORDER_ANCHOR}
+ };
- // The other attributes aren't checked, so we can leave them uninitialized
- TestArchiveEntry touched_entry;
- touched_entry.name = MODIFY_DEST;
- touched_entry.content = "";
- touched_entry.size = 0;
- touched_entry.type = AE_IFREG;
+ // The other attributes aren't checked, so we can leave them uninitialized
+ TestArchiveEntry touched_entry;
+ touched_entry.name = MODIFY_DEST;
+ touched_entry.content = "";
+ touched_entry.size = 0;
+ touched_entry.type = AE_IFREG;
- // Copy original map and append touched entry
- OrderedTestArchive mod_archive = test_archive;
- mod_archive.map[MODIFY_DEST] = touched_entry;
+ // Copy original map and append touched entry
+ OrderedTestArchive mod_archive = test_archive;
+ mod_archive.map[MODIFY_DEST] = touched_entry;
- auto it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR);
- mod_archive.order.insert(it, MODIFY_DEST);
+ auto it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR);
+ mod_archive.order.insert(it, MODIFY_DEST);
- REQUIRE(run_archive_test(test_archive, mod_archive, properties, false));
+ REQUIRE(run_archive_test(test_archive, mod_archive, properties, false));
}
TEST_CASE("Test ManipulateArchive Ordered Copy (before)", "[testManipulateArchiveOrderedCopyBefore]") {
- OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT);
+ OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT);
- PROP_MAP_T properties {
- {org::apache::nifi::minifi::processors::ManipulateArchive::Target, MODIFY_SRC},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Destination, MODIFY_DEST},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Operation,
- org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_COPY},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Before, ORDER_ANCHOR}
- };
+ PROP_MAP_T properties{
+ {ManipulateArchive::Target, MODIFY_SRC},
+ {ManipulateArchive::Destination, MODIFY_DEST},
+ {ManipulateArchive::Operation,
+ ManipulateArchive::OPERATION_COPY},
+ {ManipulateArchive::Before, ORDER_ANCHOR}
+ };
- OrderedTestArchive mod_archive = test_archive;
- mod_archive.map[MODIFY_DEST] = test_archive.map[MODIFY_SRC];
- auto it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR);
- mod_archive.order.insert(it, MODIFY_DEST);
+ OrderedTestArchive mod_archive = test_archive;
+ mod_archive.map[MODIFY_DEST] = test_archive.map[MODIFY_SRC];
+ auto it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR);
+ mod_archive.order.insert(it, MODIFY_DEST);
- REQUIRE(run_archive_test(test_archive, mod_archive, properties));
+ REQUIRE(run_archive_test(test_archive, mod_archive, properties));
}
TEST_CASE("Test ManipulateArchive Ordered Move (before)", "[testManipulateArchiveOrderedMoveBefore]") {
- OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT);
+ OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT);
- PROP_MAP_T properties {
- {org::apache::nifi::minifi::processors::ManipulateArchive::Target, MODIFY_SRC},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Destination, MODIFY_DEST},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Operation,
- org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_MOVE},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Before, ORDER_ANCHOR}
- };
+ PROP_MAP_T properties{
+ {ManipulateArchive::Target, MODIFY_SRC},
+ {ManipulateArchive::Destination, MODIFY_DEST},
+ {ManipulateArchive::Operation,
+ ManipulateArchive::OPERATION_MOVE},
+ {ManipulateArchive::Before, ORDER_ANCHOR}
+ };
- OrderedTestArchive mod_archive = test_archive;
+ OrderedTestArchive mod_archive = test_archive;
- // Update map
- mod_archive.map[MODIFY_DEST] = test_archive.map[MODIFY_SRC];
- mod_archive.map[MODIFY_DEST].name = MODIFY_DEST;
- auto m_it = mod_archive.map.find(MODIFY_SRC);
- mod_archive.map.erase(m_it);
+ // Update map
+ mod_archive.map[MODIFY_DEST] = test_archive.map[MODIFY_SRC];
+ mod_archive.map[MODIFY_DEST].name = MODIFY_DEST;
+ auto m_it = mod_archive.map.find(MODIFY_SRC);
+ mod_archive.map.erase(m_it);
- // Update order
- auto o_it = std::find(mod_archive.order.begin(), mod_archive.order.end(), MODIFY_SRC);
- mod_archive.order.erase(o_it);
- o_it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR);
- mod_archive.order.insert(o_it, MODIFY_DEST);
+ // Update order
+ auto o_it = std::find(mod_archive.order.begin(), mod_archive.order.end(), MODIFY_SRC);
+ mod_archive.order.erase(o_it);
+ o_it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR);
+ mod_archive.order.insert(o_it, MODIFY_DEST);
- REQUIRE(run_archive_test(test_archive, mod_archive, properties));
+ REQUIRE(run_archive_test(test_archive, mod_archive, properties));
}
TEST_CASE("Test ManipulateArchive Ordered Touch (after)", "[testManipulateArchiveOrderedTouchAfter]") {
- OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT);
+ OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT);
- PROP_MAP_T properties {
- {org::apache::nifi::minifi::processors::ManipulateArchive::Destination, MODIFY_DEST},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Operation,
- org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_TOUCH},
- {org::apache::nifi::minifi::processors::ManipulateArchive::After, ORDER_ANCHOR}
- };
+ PROP_MAP_T properties{
+ {ManipulateArchive::Destination, MODIFY_DEST},
+ {ManipulateArchive::Operation,
+ ManipulateArchive::OPERATION_TOUCH},
+ {ManipulateArchive::After, ORDER_ANCHOR}
+ };
- // The other attributes aren't checked, so we can leave them uninitialized
- TestArchiveEntry touched_entry;
- touched_entry.name = MODIFY_DEST;
- touched_entry.content = "";
- touched_entry.size = 0;
- touched_entry.type = AE_IFREG;
+ // The other attributes aren't checked, so we can leave them uninitialized
+ TestArchiveEntry touched_entry;
+ touched_entry.name = MODIFY_DEST;
+ touched_entry.content = "";
+ touched_entry.size = 0;
+ touched_entry.type = AE_IFREG;
- // Copy original map and append touched entry
- OrderedTestArchive mod_archive = test_archive;
- mod_archive.map[MODIFY_DEST] = touched_entry;
+ // Copy original map and append touched entry
+ OrderedTestArchive mod_archive = test_archive;
+ mod_archive.map[MODIFY_DEST] = touched_entry;
- auto it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR);
- it++;
- mod_archive.order.insert(it, MODIFY_DEST);
+ auto it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR);
+ it++;
+ mod_archive.order.insert(it, MODIFY_DEST);
- REQUIRE(run_archive_test(test_archive, mod_archive, properties, false));
+ REQUIRE(run_archive_test(test_archive, mod_archive, properties, false));
}
TEST_CASE("Test ManipulateArchive Ordered Copy (after)", "[testManipulateArchiveOrderedCopyAfter]") {
- OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT);
+ OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT);
- PROP_MAP_T properties {
- {org::apache::nifi::minifi::processors::ManipulateArchive::Target, MODIFY_SRC},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Destination, MODIFY_DEST},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Operation,
- org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_COPY},
- {org::apache::nifi::minifi::processors::ManipulateArchive::After, ORDER_ANCHOR}
- };
+ PROP_MAP_T properties{
+ {ManipulateArchive::Target, MODIFY_SRC},
+ {ManipulateArchive::Destination, MODIFY_DEST},
+ {ManipulateArchive::Operation,
+ ManipulateArchive::OPERATION_COPY},
+ {ManipulateArchive::After, ORDER_ANCHOR}
+ };
- OrderedTestArchive mod_archive = test_archive;
- mod_archive.map[MODIFY_DEST] = test_archive.map[MODIFY_SRC];
- auto it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR);
- it++;
- mod_archive.order.insert(it, MODIFY_DEST);
+ OrderedTestArchive mod_archive = test_archive;
+ mod_archive.map[MODIFY_DEST] = test_archive.map[MODIFY_SRC];
+ auto it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR);
+ it++;
+ mod_archive.order.insert(it, MODIFY_DEST);
- REQUIRE(run_archive_test(test_archive, mod_archive, properties));
+ REQUIRE(run_archive_test(test_archive, mod_archive, properties));
}
TEST_CASE("Test ManipulateArchive Ordered Move (after)", "[testManipulateArchiveOrderedMoveAfter]") {
- OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT);
+ OrderedTestArchive test_archive = build_ordered_test_archive(NUM_FILES, FILE_NAMES, FILE_CONTENT);
- PROP_MAP_T properties {
- {org::apache::nifi::minifi::processors::ManipulateArchive::Target, MODIFY_SRC},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Destination, MODIFY_DEST},
- {org::apache::nifi::minifi::processors::ManipulateArchive::Operation,
- org::apache::nifi::minifi::processors::ManipulateArchive::OPERATION_MOVE},
- {org::apache::nifi::minifi::processors::ManipulateArchive::After, ORDER_ANCHOR}
- };
+ PROP_MAP_T properties{
+ {ManipulateArchive::Target, MODIFY_SRC},
+ {ManipulateArchive::Destination, MODIFY_DEST},
+ {ManipulateArchive::Operation, ManipulateArchive::OPERATION_MOVE},
+ {ManipulateArchive::After, ORDER_ANCHOR}
+ };
- OrderedTestArchive mod_archive = test_archive;
+ OrderedTestArchive mod_archive = test_archive;
- // Update map
- mod_archive.map[MODIFY_DEST] = test_archive.map[MODIFY_SRC];
- mod_archive.map[MODIFY_DEST].name = MODIFY_DEST;
- auto m_it = mod_archive.map.find(MODIFY_SRC);
- mod_archive.map.erase(m_it);
+ // Update map
+ mod_archive.map[MODIFY_DEST] = test_archive.map[MODIFY_SRC];
+ mod_archive.map[MODIFY_DEST].name = MODIFY_DEST;
+ auto m_it = mod_archive.map.find(MODIFY_SRC);
+ mod_archive.map.erase(m_it);
- // Update order
- auto o_it = std::find(mod_archive.order.begin(), mod_archive.order.end(), MODIFY_SRC);
- mod_archive.order.erase(o_it);
- o_it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR);
- o_it++;
- mod_archive.order.insert(o_it, MODIFY_DEST);
+ // Update order
+ auto o_it = std::find(mod_archive.order.begin(), mod_archive.order.end(), MODIFY_SRC);
+ mod_archive.order.erase(o_it);
+ o_it = std::find(mod_archive.order.begin(), mod_archive.order.end(), ORDER_ANCHOR);
+ o_it++;
+ mod_archive.order.insert(o_it, MODIFY_DEST);
- REQUIRE(run_archive_test(test_archive, mod_archive, properties));
+ REQUIRE(run_archive_test(test_archive, mod_archive, properties));
}
+
+} // namespace org::apache::nifi::minifi::processors::test
diff --git a/libminifi/test/unit/OsUtilTests.cpp b/libminifi/test/unit/OsUtilTests.cpp
index a7777d1..15c4628 100644
--- a/libminifi/test/unit/OsUtilTests.cpp
+++ b/libminifi/test/unit/OsUtilTests.cpp
@@ -39,6 +39,26 @@
CHECK_FALSE(minifi::utils::OsUtils::userIdToUsername("S-1-3-4").empty());
CHECK_FALSE(minifi::utils::OsUtils::userIdToUsername("S-1-5-80-0").empty());
}
+
+TEST_CASE("OsUtils::stringToWideString tests") {
+ using org::apache::nifi::minifi::utils::OsUtils::stringToWideString;
+
+ CHECK(stringToWideString("árvíztűrő tükörfúrógép") == L"árvíztűrő tükörfúrógép");
+ CHECK(stringToWideString("Falsches Üben von Xylophonmusik quält jeden größeren Zwerg.") == L"Falsches Üben von Xylophonmusik quält jeden größeren Zwerg.");
+ CHECK(stringToWideString("가나다라마바사아자차카타파하") == L"가나다라마바사아자차카타파하");
+ CHECK(stringToWideString("العربية تجربة") == L"العربية تجربة");
+ CHECK(stringToWideString("פטכןצימסעואבגדהוזחטייכלמנסעפצקרשת") == L"פטכןצימסעואבגדהוזחטייכלמנסעפצקרשת");
+}
+
+TEST_CASE("OsUtils::wideStringToString tests") {
+ using org::apache::nifi::minifi::utils::OsUtils::wideStringToString;
+
+ CHECK(wideStringToString(L"árvíztűrő tükörfúrógép") == "árvíztűrő tükörfúrógép");
+ CHECK(wideStringToString(L"Falsches Üben von Xylophonmusik quält jeden größeren Zwerg.") == "Falsches Üben von Xylophonmusik quält jeden größeren Zwerg.");
+ CHECK(wideStringToString(L"가나다라마바사아자차카타파하") == "가나다라마바사아자차카타파하");
+ CHECK(wideStringToString(L"العربية تجربة") == "العربية تجربة");
+ CHECK(wideStringToString(L"פטכןצימסעואבגדהוזחטייכלמנסעפצקרשת") == "פטכןצימסעואבגדהוזחטייכלמנסעפצקרשת");
+}
#endif
TEST_CASE("Machine architecture is supported") {
diff --git a/libminifi/test/unit/TimeUtilTests.cpp b/libminifi/test/unit/TimeUtilTests.cpp
index 2e56ed0..9b9eb72 100644
--- a/libminifi/test/unit/TimeUtilTests.cpp
+++ b/libminifi/test/unit/TimeUtilTests.cpp
@@ -117,6 +117,30 @@
REQUIRE(unix_epoch_plus_3e9_sec.time_since_epoch() == 3000000000s);
}
+#ifdef WIN32
+TEST_CASE("Test windows file_clock duration period and epoch") {
+ using namespace std::chrono;
+ static_assert(std::ratio_equal_v<std::chrono::file_clock::duration::period, std::ratio<1, 10000000>>, "file_clock duration tick period must be 100 nanoseconds");
+ auto file_clock_epoch = std::chrono::file_clock::time_point{};
+ auto file_clock_epoch_as_sys_time = utils::file::to_sys(file_clock_epoch);
+ system_clock::time_point expected_windows_file_epoch = date::sys_days(date::January / 1 / 1601);
+ CHECK(file_clock_epoch_as_sys_time == expected_windows_file_epoch);
+}
+
+TEST_CASE("Test windows FILETIME epoch") {
+ SYSTEMTIME system_time;
+ FILETIME file_time{.dwLowDateTime = 0, .dwHighDateTime = 0};
+ FileTimeToSystemTime(&file_time, &system_time);
+ CHECK(system_time.wYear == 1601);
+ CHECK(system_time.wMonth == 1);
+ CHECK(system_time.wDay == 1);
+ CHECK(system_time.wHour == 0);
+ CHECK(system_time.wMinute == 0);
+ CHECK(system_time.wSecond == 0);
+ CHECK(system_time.wMilliseconds == 0);
+}
+#endif
+
TEST_CASE("Test clock resolutions", "[clockresolutiontests]") {
using namespace std::chrono;
CHECK(std::is_constructible<system_clock::duration, std::chrono::microseconds>::value); // The resolution of the system_clock is at least microseconds
diff --git a/run_clang_tidy.sh b/run_clang_tidy.sh
index 79bab75..9600227 100755
--- a/run_clang_tidy.sh
+++ b/run_clang_tidy.sh
@@ -4,7 +4,7 @@
FILE=$1
-EXCLUDED_DIRECTORY=("extensions/pdh" "extensions/windows-event-log" "nanofi")
+EXCLUDED_DIRECTORY=("extensions/pdh" "extensions/windows-event-log" "extensions/smb" "nanofi")
EXCLUDED_FILES=("WindowsCertStoreLocationTests.cpp")
for excluded_file in "${EXCLUDED_FILES[@]}"; do
diff --git a/win_build_vs.bat b/win_build_vs.bat
index fada62d..2f935e3 100755
--- a/win_build_vs.bat
+++ b/win_build_vs.bat
@@ -37,6 +37,9 @@
set enable_lua_scripting=ON
set enable_mqtt=ON
set enable_opc=ON
+set enable_pdh=OFF
+set enable_splunk=ON
+set enable_smb=ON
set enable_openwsman=OFF
set enable_ops=ON
set enable_pcap=OFF
@@ -70,6 +73,7 @@
if [%%~x] EQU [/SFTP] set enable_sftp=ON
if [%%~x] EQU [/PDH] set enable_pdh=ON
if [%%~x] EQU [/NO_SPLUNK] set enable_splunk=OFF
+ if [%%~x] EQU [/NO_SMB] set enable_smb=OFF
if [%%~x] EQU [/NO_GCP] set enable_gcp=OFF
if [%%~x] EQU [/NO_ELASTIC] set enable_elastic=OFF
if [%%~x] EQU [/M] set installer_merge_modules=ON
@@ -113,7 +117,7 @@
-DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%enable_kafka% -DENABLE_JNI=%enable_jni% -DOPENSSL_OFF=OFF ^
-DENABLE_COAP=%enable_coap% -DENABLE_AWS=%enable_aws% -DENABLE_PDH=%enable_pdh% -DENABLE_AZURE=%enable_azure% -DENABLE_SFTP=%enable_sftp% -DENABLE_SPLUNK=%enable_splunk% -DENABLE_GCP=%enable_gcp% ^
-DENABLE_NANOFI=%enable_nanofi% -DENABLE_OPENCV=%enable_opencv% -DENABLE_PROMETHEUS=%enable_prometheus% -DENABLE_ELASTICSEARCH=%enable_elastic% -DUSE_SHARED_LIBS=OFF -DENABLE_CONTROLLER=OFF ^
- -DENABLE_BUSTACHE=%enable_bustache% -DENABLE_ENCRYPT_CONFIG=%enable_encrypt_config% -DENABLE_LUA_SCRIPTING=%enable_lua_scripting% ^
+ -DENABLE_BUSTACHE=%enable_bustache% -DENABLE_ENCRYPT_CONFIG=%enable_encrypt_config% -DENABLE_LUA_SCRIPTING=%enable_lua_scripting% -DENABLE_SMB=%enable_smb% ^
-DENABLE_MQTT=%enable_mqtt% -DENABLE_OPC=%enable_opc% -DENABLE_OPENWSMAN=%enable_openwsman% -DENABLE_OPS=%enable_ops% -DENABLE_PCAP=%enable_pcap% ^
-DENABLE_PYTHON_SCRIPTING=%enable_python_scripting% -DENABLE_SENSORS=%enable_sensors% -DENABLE_USB_CAMERA=%enable_usb_camera% ^
-DBUILD_ROCKSDB=ON -DFORCE_WINDOWS=ON -DUSE_SYSTEM_UUID=OFF -DDISABLE_LIBARCHIVE=OFF -DENABLE_WEL=ON -DFAIL_ON_WARNINGS=OFF -DSKIP_TESTS=%skiptests% ^