blob: d3ee16d9c6f28585a7cda65a174600f2a6aee768 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/stat.h>
#include <cstring>
#include <utility>
#include <chrono>
#include <fstream>
#include <memory>
#include <string>
#include <thread>
#include <type_traits>
#include <vector>
#include <iostream>
#include <algorithm>
#include <functional>
#include <iterator>
#include <random>
#ifndef WIN32
#include <unistd.h>
#endif
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "utils/file/FileUtils.h"
#include "core/Core.h"
#include "core/logging/Logger.h"
#include "core/ProcessGroup.h"
#include "FlowController.h"
#include "properties/Configure.h"
#include "unit/ProvenanceTestHelper.h"
#include "processors/ListSFTP.h"
#include "processors/GenerateFlowFile.h"
#include "processors/LogAttribute.h"
#include "tools/SFTPTestServer.h"
#include "unit/TestUtils.h"
using namespace std::literals::chrono_literals;
class ListSFTPTestsFixture {
public:
explicit ListSFTPTestsFixture(const std::shared_ptr<minifi::Configure>& configuration = nullptr) {
LogTestController::getInstance().reset();
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::FlowController>();
LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
LogTestController::getInstance().setDebug<minifi::core::Processor>();
LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
LogTestController::getInstance().setDebug<minifi::processors::GenerateFlowFile>();
LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
LogTestController::getInstance().setTrace<minifi::processors::ListSFTP>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
LogTestController::getInstance().setDebug<SFTPTestServer>();
REQUIRE_FALSE(working_directory.empty());
// Start SFTP server
sftp_server = std::make_unique<SFTPTestServer>(working_directory);
REQUIRE(true == sftp_server->start());
// Build MiNiFi processing graph
createPlan(nullptr, configuration);
}
ListSFTPTestsFixture(ListSFTPTestsFixture&&) = delete;
ListSFTPTestsFixture(const ListSFTPTestsFixture&) = delete;
ListSFTPTestsFixture& operator=(ListSFTPTestsFixture&&) = delete;
ListSFTPTestsFixture& operator=(const ListSFTPTestsFixture&) = delete;
virtual ~ListSFTPTestsFixture() = default;
void createPlan(const utils::Identifier* list_sftp_uuid = nullptr, const std::shared_ptr<minifi::Configure>& configuration = nullptr) {
const auto state_dir = plan == nullptr ? testController.createTempDirectory() : plan->getStateDir();
log_attribute = nullptr;
list_sftp = nullptr;
plan.reset();
plan = testController.createPlan(configuration, state_dir);
if (list_sftp_uuid == nullptr) {
list_sftp = plan->addProcessor(
"ListSFTP",
"ListSFTP");
} else {
list_sftp = plan->addProcessor(
"ListSFTP",
*list_sftp_uuid,
"ListSFTP",
{core::Relationship("success", "d")});
}
log_attribute = plan->addProcessor("LogAttribute",
"LogAttribute",
core::Relationship("success", "d"),
true);
// Configure ListSFTP processor
plan->setProperty(list_sftp, "Listing Strategy", minifi::processors::ListSFTP::LISTING_STRATEGY_TRACKING_TIMESTAMPS);
plan->setProperty(list_sftp, "Hostname", "localhost");
plan->setProperty(list_sftp, "Port", std::to_string(sftp_server->getPort()));
plan->setProperty(list_sftp, "Username", "nifiuser");
plan->setProperty(list_sftp, "Password", "nifipassword");
plan->setProperty(list_sftp, "Search Recursively", "false");
plan->setProperty(list_sftp, "Follow symlink", "false");
plan->setProperty(list_sftp, "Ignore Dotted Files", "false");
plan->setProperty(list_sftp, "Strict Host Key Checking", "false");
plan->setProperty(list_sftp, "Connection Timeout", "30 sec");
plan->setProperty(list_sftp, "Data Timeout", "30 sec");
plan->setProperty(list_sftp, "Send Keep Alive On Timeout", "true");
plan->setProperty(list_sftp, "Target System Timestamp Precision", minifi::processors::ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT);
plan->setProperty(list_sftp, "Minimum File Age", "0 sec");
plan->setProperty(list_sftp, "Minimum File Size", "0 B");
plan->setProperty(list_sftp, "Target System Timestamp Precision", "Seconds");
plan->setProperty(list_sftp, "Remote Path", "nifi_test/");
// Configure LogAttribute processor
plan->setProperty(log_attribute, "FlowFiles To Log", "0");
}
// Create source file
void createFile(const std::filesystem::path& relative_path, const std::string& content, const std::optional<std::chrono::file_clock::time_point>& modification_time) const {
std::fstream file;
const auto full_path = working_directory / "vfs" / relative_path;
std::filesystem::create_directories(full_path.parent_path());
file.open(full_path, std::ios::out);
file << content;
file.close();
if (modification_time.has_value()) {
REQUIRE(utils::file::set_last_write_time(full_path, modification_time.value()));
}
}
void createFileWithModificationTimeDiff(const std::filesystem::path& relative_path, const std::string& content, std::chrono::seconds modification_timediff = -5min) const {
createFile(relative_path, content, std::chrono::file_clock::now() + modification_timediff);
}
protected:
TestController testController;
std::filesystem::path working_directory = testController.createTempDirectory();
std::shared_ptr<TestPlan> plan;
std::unique_ptr<SFTPTestServer> sftp_server;
core::Processor* list_sftp = nullptr;
core::Processor* log_attribute = nullptr;
};
class PersistentListSFTPTestsFixture : public ListSFTPTestsFixture {
public:
PersistentListSFTPTestsFixture() :
ListSFTPTestsFixture(std::make_shared<minifi::ConfigureImpl>()) {
}
};
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list one file", "[ListSFTP][basic]") {
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP public key authentication", "[ListSFTP][basic]") {
plan->setProperty(list_sftp, "Remote File", "nifi_test/tstFile.ext");
plan->setProperty(list_sftp, "Private Key Path", (get_sftp_test_dir() / "resources" / "id_rsa").string());
plan->setProperty(list_sftp, "Private Key Passphrase", "privatekeypassword");
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
TestController::runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with publickey"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list non-existing dir", "[ListSFTP][basic]") {
plan->setProperty(list_sftp, "Remote Path", "nifi_test2/");
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Failed to open remote directory \"nifi_test2\", error: LIBSSH2_FX_NO_SUCH_FILE"));
REQUIRE(LogTestController::getInstance().contains("There are no files to list. Yielding."));
}
#ifndef WIN32
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list non-readable dir", "[ListSFTP][basic]") {
if (getuid() == 0) {
std::cerr << "!!!! This test does NOT work as root. Exiting. !!!!" << std::endl;
return;
}
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
std::filesystem::permissions(working_directory / "vfs" / "nifi_test", static_cast<std::filesystem::perms>(0000));
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Failed to open remote directory \"nifi_test\", error: LIBSSH2_FX_PERMISSION_DENIED"));
REQUIRE(LogTestController::getInstance().contains("There are no files to list. Yielding."));
std::filesystem::permissions(working_directory / "vfs" / "nifi_test", static_cast<std::filesystem::perms>(0755));
}
#endif
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list one file writes attributes", "[ListSFTP][basic]") {
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
auto file = working_directory / "vfs" / "nifi_test" / "tstFile.ext";
auto mtime_str = utils::timeutils::getDateTimeStr(std::chrono::time_point_cast<std::chrono::seconds>(utils::file::to_sys(utils::file::last_write_time(file).value())));
uint64_t uid = 0;
uint64_t gid = 0;
REQUIRE(true == utils::file::FileUtils::get_uid_gid(file, uid, gid));
uint32_t permissions = 0;
REQUIRE(true == utils::file::FileUtils::get_permissions(file, permissions));
std::stringstream permissions_ss;
permissions_ss << std::setfill('0') << std::setw(4) << std::oct << permissions;
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
REQUIRE(LogTestController::getInstance().contains("key:sftp.listing.user value:nifiuser"));
REQUIRE(LogTestController::getInstance().contains("key:file.owner value:" + std::to_string(uid)));
REQUIRE(LogTestController::getInstance().contains("key:file.group value:" + std::to_string(gid)));
REQUIRE(LogTestController::getInstance().contains("key:file.permissions value:" + permissions_ss.str()));
REQUIRE(LogTestController::getInstance().contains("key:file.size value:14"));
REQUIRE(LogTestController::getInstance().contains("key:file.lastModifiedTime value:" + mtime_str));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files", "[ListSFTP][basic]") {
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test with longer content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files one in a subdir no recursion", "[ListSFTP][basic]") {
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
createFileWithModificationTimeDiff("nifi_test/subdir/file2.ext", "Test with longer content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files one in a subdir with recursion", "[ListSFTP][basic]") {
plan->setProperty(list_sftp, "Search Recursively", "true");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
createFileWithModificationTimeDiff("nifi_test/subdir/file2.ext", "Test with longer content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Minimum File Age too young", "[ListSFTP][file-age]") {
plan->setProperty(list_sftp, "Minimum File Age", "2 hours");
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is younger than the Minimum File Age"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Maximum File Age too old", "[ListSFTP][file-age]") {
plan->setProperty(list_sftp, "Maximum File Age", "1 min");
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is older than the Maximum File Age"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Minimum File Size too small", "[ListSFTP][file-size]") {
plan->setProperty(list_sftp, "Minimum File Size", "1 MB");
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is smaller than the Minimum File Size: 14 B < 1048576 B"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Maximum File Size too large", "[ListSFTP][file-size]") {
plan->setProperty(list_sftp, "Maximum File Size", "4 B");
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is larger than the Maximum File Size: 14 B > 4 B"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP File Filter Regex", "[ListSFTP][file-filter-regex]") {
plan->setProperty(list_sftp, "File Filter Regex", "^.*2.*$");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test with longer content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/file1.ext\" because it did not match the File Filter Regex \"^.*2.*$\""));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Path Filter Regex", "[ListSFTP][path-filter-regex]") {
plan->setProperty(list_sftp, "Search Recursively", "true");
plan->setProperty(list_sftp, "Path Filter Regex", "^.*foobar.*$");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
createFileWithModificationTimeDiff("nifi_test/foobar/file2.ext", "Test content 2");
createFileWithModificationTimeDiff("nifi_test/notbar/file3.ext", "Test with longer content 3");
testController.runSession(plan, true);
/* file1.ext is in the root */
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
/* file2.ext is in a matching subdirectory */
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
/* file3.ext is in a non-matching subdirectory */
REQUIRE(LogTestController::getInstance().contains("Not recursing into \"nifi_test/notbar\" because it did not match the Path Filter Regex \"^.*foobar.*$\""));
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file3.ext"));
}
#ifndef WIN32
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink false file symlink", "[ListSFTP][follow-symlink]") {
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
std::filesystem::create_symlink(working_directory / "vfs" / "nifi_test" / "file1.ext", working_directory / "vfs" / "nifi_test" / "file2.ext");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("Skipping non-regular, non-directory file \"nifi_test/file2.ext\""));
}
#endif
#ifndef WIN32
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink true file symlink", "[ListSFTP][follow-symlink]") {
plan->setProperty(list_sftp, "Follow symlink", "true");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
std::filesystem::create_symlink(working_directory / "vfs" / "nifi_test" / "file1.ext", working_directory / "vfs" / "nifi_test" / "file2.ext");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
#endif
#ifndef WIN32
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink false directory symlink", "[ListSFTP][follow-symlink]") {
plan->setProperty(list_sftp, "Search Recursively", "true");
createFileWithModificationTimeDiff("nifi_test/dir1/file1.ext", "Test content 1");
std::filesystem::create_directory_symlink(working_directory / "vfs" / "nifi_test" / "dir1", working_directory / "vfs" / "nifi_test" / "dir2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("Skipping non-regular, non-directory file \"nifi_test/dir2\""));
}
#endif
#ifndef WIN32
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink true directory symlink", "[ListSFTP][follow-symlink]") {
plan->setProperty(list_sftp, "Search Recursively", "true");
plan->setProperty(list_sftp, "Follow symlink", "true");
createFileWithModificationTimeDiff("nifi_test/dir1/file1.ext", "Test content 1");
std::filesystem::create_directory_symlink(working_directory / "vfs" / "nifi_test" / "dir1", working_directory / "vfs" / "nifi_test" / "dir2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/dir1"));
REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/dir2"));
}
#endif
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
plan->reset();
LogTestController::getInstance().clear();
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("The latest listed entry timestamp is the same as the last listed entry timestamp"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file one new file", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file one older file", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -6min);
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file2.ext\", because it is not new."));
REQUIRE(LogTestController::getInstance().contains("The latest listed entry timestamp is the same as the last listed entry timestamp"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file another file with the same timestamp", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
auto mtime = utils::file::last_write_time(working_directory / "vfs" / "nifi_test" / "file1.ext").value();
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
/* We must sleep to avoid triggering the listing lag. */
std::this_thread::sleep_for(std::chrono::milliseconds(1500));
createFile("nifi_test/file2.ext", "Test content 2", mtime);
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file timestamp updated", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
auto file = working_directory / "vfs" / "nifi_test" / "file1.ext";
auto mtime = utils::file::last_write_time(file).value();
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
REQUIRE(utils::file::set_last_write_time(file, mtime + 1s));
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
/* We must sleep to avoid triggering the listing lag. */
std::this_thread::sleep_for(std::chrono::milliseconds(1500));
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("and all files for that timestamp has been processed. Yielding."));
}
TEST_CASE_METHOD(PersistentListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore state", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
std::unordered_map<std::string, std::string> state;
REQUIRE(plan->getProcessContextForProcessor(list_sftp)->getStateManager()->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
REQUIRE("nifi_test/file1.ext" == state.at("id.0"));
REQUIRE(!state.at("listing.timestamp").empty());
REQUIRE(!state.at("processed.timestamp").empty());
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
utils::Identifier list_sftp_uuid = list_sftp->getUUID();
REQUIRE(list_sftp_uuid);
createPlan(&list_sftp_uuid, std::make_shared<minifi::ConfigureImpl>());
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
LogTestController::getInstance().clear();
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min);
testController.runSession(plan, true);
REQUIRE(plan->getProcessContextForProcessor(list_sftp)->getStateManager()->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
REQUIRE("nifi_test/file2.ext" == state.at("id.0"));
REQUIRE(!state.at("listing.timestamp").empty());
REQUIRE(!state.at("processed.timestamp").empty());
REQUIRE(LogTestController::getInstance().contains("Successfully loaded state"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
REQUIRE(!LogTestController::getInstance().contains("key:filename value:file1.ext", std::chrono::seconds(0)));
}
TEST_CASE_METHOD(PersistentListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore state changed configuration", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
std::unordered_map<std::string, std::string> state;
REQUIRE(plan->getProcessContextForProcessor(list_sftp)->getStateManager()->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
utils::Identifier list_sftp_uuid = list_sftp->getUUID();
REQUIRE(list_sftp_uuid);
createPlan(&list_sftp_uuid, std::make_shared<minifi::ConfigureImpl>());
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
LogTestController::getInstance().clear();
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min);
testController.runSession(plan, true);
REQUIRE(plan->getProcessContextForProcessor(list_sftp)->getStateManager()->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("/nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
"Listing Strategy: \"Tracking Timestamps\" vs. \"Tracking Timestamps\", "
"Hostname: \"localhost\" vs. \"localhost\", "
"Username: \"nifiuser\" vs. \"nifiuser\", "
"Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps changed configuration", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
"Listing Strategy: \"Tracking Timestamps\" vs. \"Tracking Timestamps\", "
"Hostname: \"localhost\" vs. \"localhost\", "
"Username: \"nifiuser\" vs. \"nifiuser\", "
"Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
plan->reset();
LogTestController::getInstance().clear();
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/tstFile.ext\" because it has not changed"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one new file", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one older file in tracking window", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -6min);
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one older file outside tracking window", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -6h);
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file2.ext\" because it has an older timestamp than the minimum timestamp to list"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file another file with the same timestamp", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
auto mtime = utils::file::last_write_time(working_directory / "vfs" / "nifi_test" / "file1.ext").value();
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
createFile("nifi_test/file2.ext", "Test content 2", mtime);
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file timestamp updated", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
auto file = working_directory / "vfs" / "nifi_test" / "file1.ext";
auto mtime = utils::file::last_write_time(file).value();
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
REQUIRE(utils::file::set_last_write_time(file, mtime + 1s));
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Found file \"nifi_test/file1.ext\" with newer timestamp"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file size changes", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
auto mtime = utils::file::last_write_time(working_directory / "vfs" / "nifi_test" / "file1.ext").value();
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
createFile("nifi_test/file1.ext", "Longer test content 1", mtime);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Found file \"nifi_test/file1.ext\" with different size: 14 -> 21"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
}
TEST_CASE_METHOD(PersistentListSFTPTestsFixture, "ListSFTP Tracking Entities restore state", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
utils::Identifier list_sftp_uuid = list_sftp->getUUID();
REQUIRE(list_sftp_uuid);
createPlan(&list_sftp_uuid, std::make_shared<minifi::ConfigureImpl>());
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
LogTestController::getInstance().clear();
std::unordered_map<std::string, std::string> state;
REQUIRE(plan->getProcessContextForProcessor(list_sftp)->getStateManager()->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Entities" == state.at("listing_strategy"));
REQUIRE("nifi_test/file1.ext" == state.at("entity.0.name"));
REQUIRE("14" == state.at("entity.0.size"));
REQUIRE(!state.at("entity.0.timestamp").empty());
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min);
testController.runSession(plan, true);
REQUIRE(plan->getProcessContextForProcessor(list_sftp)->getStateManager()->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Entities" == state.at("listing_strategy"));
if (state.at("entity.0.name") == "nifi_test/file1.ext") {
REQUIRE("nifi_test/file1.ext" == state.at("entity.0.name"));
REQUIRE("nifi_test/file2.ext" == state.at("entity.1.name"));
} else {
REQUIRE("nifi_test/file2.ext" == state.at("entity.0.name"));
REQUIRE("nifi_test/file1.ext" == state.at("entity.1.name"));
}
REQUIRE("14" == state.at("entity.0.size"));
REQUIRE(!state.at("entity.0.timestamp").empty());
REQUIRE("14" == state.at("entity.1.size"));
REQUIRE(!state.at("entity.1.timestamp").empty());
REQUIRE(LogTestController::getInstance().contains("Successfully loaded state"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
REQUIRE(!LogTestController::getInstance().contains("key:filename value:file1.ext", std::chrono::seconds(0)));
}
TEST_CASE_METHOD(PersistentListSFTPTestsFixture, "ListSFTP Tracking Entities restore state changed configuration", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
std::unordered_map<std::string, std::string> state;
REQUIRE(plan->getProcessContextForProcessor(list_sftp)->getStateManager()->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Entities" == state.at("listing_strategy"));
utils::Identifier list_sftp_uuid = list_sftp->getUUID();
REQUIRE(list_sftp_uuid);
createPlan(&list_sftp_uuid, std::make_shared<minifi::ConfigureImpl>());
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
LogTestController::getInstance().clear();
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min);
testController.runSession(plan, true);
REQUIRE(plan->getProcessContextForProcessor(list_sftp)->getStateManager()->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("/nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Entities" == state.at("listing_strategy"));
REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
"Listing Strategy: \"Tracking Entities\" vs. \"Tracking Entities\", "
"Hostname: \"localhost\" vs. \"localhost\", "
"Username: \"nifiuser\" vs. \"nifiuser\", "
"Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities changed configuration", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().clear();
plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -4min);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
"Listing Strategy: \"Tracking Entities\" vs. \"Tracking Entities\", "
"Hostname: \"localhost\" vs. \"localhost\", "
"Username: \"nifiuser\" vs. \"nifiuser\", "
"Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities Initial Listing Target Tracking Time Window entity outside window", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
plan->setProperty(list_sftp, "Entity Tracking Initial Listing Target", minifi::processors::ListSFTP::ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW);
plan->setProperty(list_sftp, "Entity Tracking Time Window", "10 minutes");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1", -20min);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file1.ext\" because it has an older timestamp than the minimum timestamp to list"));
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities Initial Listing Target Tracking Time Window entity inside window", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
plan->setProperty(list_sftp, "Entity Tracking Initial Listing Target", minifi::processors::ListSFTP::ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW);
plan->setProperty(list_sftp, "Entity Tracking Time Window", "10 minutes");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
}