blob: e537bc446005e1f11785c55d6514fa506d05406c [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/stat.h>
#undef NDEBUG
#include <cassert>
#include <cstring>
#include <utility>
#include <chrono>
#include <fstream>
#include <memory>
#include <string>
#include <thread>
#include <type_traits>
#include <vector>
#include <iostream>
#include <sstream>
#include <algorithm>
#include <functional>
#include <iterator>
#include <random>
#ifndef WIN32
#include <unistd.h>
#endif
#include "TestBase.h"
#include "utils/StringUtils.h"
#include "utils/file/FileUtils.h"
#include "core/Core.h"
#include "core/logging/Logger.h"
#include "core/ProcessGroup.h"
#include "core/yaml/YamlConfiguration.h"
#include "FlowController.h"
#include "properties/Configure.h"
#include "unit/ProvenanceTestHelper.h"
#include "io/StreamFactory.h"
#include "processors/ListSFTP.h"
#include "processors/GenerateFlowFile.h"
#include "processors/LogAttribute.h"
#include "processors/UpdateAttribute.h"
#include "tools/SFTPTestServer.h"
class ListSFTPTestsFixture {
public:
ListSFTPTestsFixture()
: src_dir(strdup("/var/tmp/sftps.XXXXXX")) {
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::FlowController>();
LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
LogTestController::getInstance().setDebug<minifi::core::Processor>();
LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
LogTestController::getInstance().setTrace<processors::ListSFTP>();
LogTestController::getInstance().setDebug<processors::LogAttribute>();
LogTestController::getInstance().setDebug<SFTPTestServer>();
// Create temporary directories
testController.createTempDirectory(src_dir);
REQUIRE(src_dir != nullptr);
// Start SFTP server
sftp_server = std::unique_ptr<SFTPTestServer>(new SFTPTestServer(src_dir));
REQUIRE(true == sftp_server->start());
// Build MiNiFi processing graph
createPlan();
}
virtual ~ListSFTPTestsFixture() {
free(src_dir);
LogTestController::getInstance().reset();
}
void createPlan(utils::Identifier* list_sftp_uuid = nullptr) {
const std::string state_dir = plan == nullptr ? "" : plan->getStateDir();
log_attribute.reset();
list_sftp.reset();
plan.reset();
plan = testController.createPlan(nullptr /*config*/, state_dir.empty() ? nullptr : state_dir.c_str());
if (list_sftp_uuid == nullptr) {
list_sftp = plan->addProcessor(
"ListSFTP",
"ListSFTP");
} else {
list_sftp = plan->addProcessor(
"ListSFTP",
*list_sftp_uuid,
"ListSFTP",
{core::Relationship("success", "d")});
}
log_attribute = plan->addProcessor("LogAttribute",
"LogAttribute",
core::Relationship("success", "d"),
true);
// Configure ListSFTP processor
plan->setProperty(list_sftp, "Listing Strategy", processors::ListSFTP::LISTING_STRATEGY_TRACKING_TIMESTAMPS);
plan->setProperty(list_sftp, "Hostname", "localhost");
plan->setProperty(list_sftp, "Port", std::to_string(sftp_server->getPort()));
plan->setProperty(list_sftp, "Username", "nifiuser");
plan->setProperty(list_sftp, "Password", "nifipassword");
plan->setProperty(list_sftp, "Search Recursively", "false");
plan->setProperty(list_sftp, "Follow symlink", "false");
plan->setProperty(list_sftp, "Ignore Dotted Files", "false");
plan->setProperty(list_sftp, "Strict Host Key Checking", "false");
plan->setProperty(list_sftp, "Connection Timeout", "30 sec");
plan->setProperty(list_sftp, "Data Timeout", "30 sec");
plan->setProperty(list_sftp, "Send Keep Alive On Timeout", "true");
plan->setProperty(list_sftp, "Target System Timestamp Precision", processors::ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT);
plan->setProperty(list_sftp, "Minimum File Age", "0 sec");
plan->setProperty(list_sftp, "Minimum File Size", "0 B");
plan->setProperty(list_sftp, "Target System Timestamp Precision", "Seconds");
plan->setProperty(list_sftp, "Remote Path", "nifi_test/");
// Configure LogAttribute processor
plan->setProperty(log_attribute, "FlowFiles To Log", "0");
}
// Create source file
void createFile(const std::string& relative_path, const std::string& content, uint64_t modification_timestamp = 0U) {
std::fstream file;
std::stringstream ss;
ss << src_dir << "/vfs/" << relative_path;
auto full_path = ss.str();
std::deque<std::string> parent_dirs;
std::string parent_dir = full_path;
while ((parent_dir = utils::file::FileUtils::get_parent_path(parent_dir)) != "") {
parent_dirs.push_front(parent_dir);
}
for (const auto& dir : parent_dirs) {
utils::file::FileUtils::create_dir(dir);
}
file.open(ss.str(), std::ios::out);
file << content;
file.close();
if (modification_timestamp != 0U) {
REQUIRE(true == utils::file::FileUtils::set_last_write_time(full_path, modification_timestamp));
}
}
void createFileWithModificationTimeDiff(const std::string& relative_path, const std::string& content, int64_t modification_timediff = -300 /*5 minutes ago*/) {
time_t now = time(nullptr);
return createFile(relative_path, content, now + modification_timediff);
}
protected:
char *src_dir;
std::unique_ptr<SFTPTestServer> sftp_server;
TestController testController;
std::shared_ptr<TestPlan> plan;
std::shared_ptr<core::Processor> list_sftp;
std::shared_ptr<core::Processor> log_attribute;
};
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list one file", "[ListSFTP][basic]") {
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP public key authentication", "[ListSFTP][basic]") {
plan->setProperty(list_sftp, "Remote File", "nifi_test/tstFile.ext");
plan->setProperty(list_sftp, "Private Key Path", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"));
plan->setProperty(list_sftp, "Private Key Passphrase", "privatekeypassword");
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with publickey"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list non-existing dir", "[ListSFTP][basic]") {
plan->setProperty(list_sftp, "Remote Path", "nifi_test2/");
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Failed to open remote directory \"nifi_test2\", error: LIBSSH2_FX_NO_SUCH_FILE"));
REQUIRE(LogTestController::getInstance().contains("There are no files to list. Yielding."));
}
#ifndef WIN32
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list non-readable dir", "[ListSFTP][basic]") {
if (getuid() == 0) {
std::cerr << "!!!! This test does NOT work as root. Exiting. !!!!" << std::endl;
return;
}
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
REQUIRE(0 == chmod((std::string(src_dir) + "/vfs/nifi_test").c_str(), 0000));
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Failed to open remote directory \"nifi_test\", error: LIBSSH2_FX_PERMISSION_DENIED"));
REQUIRE(LogTestController::getInstance().contains("There are no files to list. Yielding."));
}
#endif
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list one file writes attributes", "[ListSFTP][basic]") {
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
auto file = std::string(src_dir) + "/vfs/nifi_test/tstFile.ext";
auto mtime = utils::file::FileUtils::last_write_time(file);
std::string mtime_str;
REQUIRE(true == utils::timeutils::getDateTimeStr(mtime, mtime_str));
uint64_t uid, gid;
REQUIRE(true == utils::file::FileUtils::get_uid_gid(file, uid, gid));
uint32_t permissions;
REQUIRE(true == utils::file::FileUtils::get_permissions(file, permissions));
std::stringstream permissions_ss;
permissions_ss << std::setfill('0') << std::setw(4) << std::oct << permissions;
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
REQUIRE(LogTestController::getInstance().contains("key:sftp.listing.user value:nifiuser"));
REQUIRE(LogTestController::getInstance().contains("key:file.owner value:" + std::to_string(uid)));
REQUIRE(LogTestController::getInstance().contains("key:file.group value:" + std::to_string(gid)));
REQUIRE(LogTestController::getInstance().contains("key:file.permissions value:" + permissions_ss.str()));
REQUIRE(LogTestController::getInstance().contains("key:file.size value:14"));
REQUIRE(LogTestController::getInstance().contains("key:file.lastModifiedTime value:" + mtime_str));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files", "[ListSFTP][basic]") {
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test with longer content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files one in a subdir no recursion", "[ListSFTP][basic]") {
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
createFileWithModificationTimeDiff("nifi_test/subdir/file2.ext", "Test with longer content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files one in a subdir with recursion", "[ListSFTP][basic]") {
plan->setProperty(list_sftp, "Search Recursively", "true");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
createFileWithModificationTimeDiff("nifi_test/subdir/file2.ext", "Test with longer content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Minimum File Age too young", "[ListSFTP][file-age]") {
plan->setProperty(list_sftp, "Minimum File Age", "2 hours");
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is younger than the Minimum File Age"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Maximum File Age too old", "[ListSFTP][file-age]") {
plan->setProperty(list_sftp, "Maximum File Age", "1 min");
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is older than the Maximum File Age"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Minimum File Size too small", "[ListSFTP][file-size]") {
plan->setProperty(list_sftp, "Minimum File Size", "1 MB");
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is smaller than the Minimum File Size: 14 B < 1048576 B"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Maximum File Size too large", "[ListSFTP][file-size]") {
plan->setProperty(list_sftp, "Maximum File Size", "4 B");
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is larger than the Maximum File Size: 14 B > 4 B"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP File Filter Regex", "[ListSFTP][file-filter-regex]") {
plan->setProperty(list_sftp, "File Filter Regex", "^.*2.*$");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test with longer content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/file1.ext\" because it did not match the File Filter Regex \"^.*2.*$\""));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Path Filter Regex", "[ListSFTP][path-filter-regex]") {
plan->setProperty(list_sftp, "Search Recursively", "true");
plan->setProperty(list_sftp, "Path Filter Regex", "^.*foobar.*$");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
createFileWithModificationTimeDiff("nifi_test/foobar/file2.ext", "Test content 2");
createFileWithModificationTimeDiff("nifi_test/notbar/file3.ext", "Test with longer content 3");
testController.runSession(plan, true);
/* file1.ext is in the root */
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
/* file2.ext is in a matching subdirectory */
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
/* file3.ext is in a non-matching subdirectory */
REQUIRE(LogTestController::getInstance().contains("Not recursing into \"nifi_test/notbar\" because it did not match the Path Filter Regex \"^.*foobar.*$\""));
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file3.ext"));
}
#ifndef WIN32
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink false file symlink", "[ListSFTP][follow-symlink]") {
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
auto file1 = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
auto file2 = std::string(src_dir) + "/vfs/nifi_test/file2.ext";
REQUIRE(0 == symlink(file1.c_str(), file2.c_str()));
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("Skipping non-regular, non-directory file \"nifi_test/file2.ext\""));
}
#endif
#ifndef WIN32
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink true file symlink", "[ListSFTP][follow-symlink]") {
plan->setProperty(list_sftp, "Follow symlink", "true");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
auto file1 = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
auto file2 = std::string(src_dir) + "/vfs/nifi_test/file2.ext";
REQUIRE(0 == symlink(file1.c_str(), file2.c_str()));
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
#endif
#ifndef WIN32
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink false directory symlink", "[ListSFTP][follow-symlink]") {
plan->setProperty(list_sftp, "Search Recursively", "true");
createFileWithModificationTimeDiff("nifi_test/dir1/file1.ext", "Test content 1");
auto dir1 = std::string(src_dir) + "/vfs/nifi_test/dir1";
auto dir2 = std::string(src_dir) + "/vfs/nifi_test/dir2";
REQUIRE(0 == symlink(dir1.c_str(), dir2.c_str()));
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("Skipping non-regular, non-directory file \"nifi_test/dir2\""));
}
#endif
#ifndef WIN32
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink true directory symlink", "[ListSFTP][follow-symlink]") {
plan->setProperty(list_sftp, "Search Recursively", "true");
plan->setProperty(list_sftp, "Follow symlink", "true");
createFileWithModificationTimeDiff("nifi_test/dir1/file1.ext", "Test content 1");
auto dir1 = std::string(src_dir) + "/vfs/nifi_test/dir1";
auto dir2 = std::string(src_dir) + "/vfs/nifi_test/dir2";
REQUIRE(0 == symlink(dir1.c_str(), dir2.c_str()));
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/dir1"));
REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/dir2"));
}
#endif
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("The latest listed entry timestamp is the same as the last listed entry timestamp"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file one new file", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file one older file", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -360 /* 6 minutes ago */);
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file2.ext\", because it is not new."));
REQUIRE(LogTestController::getInstance().contains("The latest listed entry timestamp is the same as the last listed entry timestamp"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file another file with the same timestamp", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
auto mtime = utils::file::FileUtils::last_write_time(file);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
/* We must sleep to avoid triggering the listing lag. */
std::this_thread::sleep_for(std::chrono::milliseconds(1500));
createFile("nifi_test/file2.ext", "Test content 2", mtime);
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file timestamp updated", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
auto mtime = utils::file::FileUtils::last_write_time(file);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
REQUIRE(true == utils::file::FileUtils::set_last_write_time(file, mtime + 1));
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
/* We must sleep to avoid triggering the listing lag. */
std::this_thread::sleep_for(std::chrono::milliseconds(1500));
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("and all files for that timestamp has been processed. Yielding."));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore state", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
std::unordered_map<std::string, std::string> state;
REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
REQUIRE("nifi_test/file1.ext" == state.at("id.0"));
REQUIRE(!state.at("listing.timestamp").empty());
REQUIRE(!state.at("processed.timestamp").empty());
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
utils::Identifier list_sftp_uuid = list_sftp->getUUID();
REQUIRE(list_sftp_uuid);
createPlan(&list_sftp_uuid);
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
testController.runSession(plan, true);
REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
REQUIRE("nifi_test/file2.ext" == state.at("id.0"));
REQUIRE(!state.at("listing.timestamp").empty());
REQUIRE(!state.at("processed.timestamp").empty());
REQUIRE(LogTestController::getInstance().contains("Successfully loaded state"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
REQUIRE(!LogTestController::getInstance().contains("key:filename value:file1.ext", std::chrono::seconds(0)));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore state changed configuration", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
std::unordered_map<std::string, std::string> state;
REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
utils::Identifier list_sftp_uuid = list_sftp->getUUID();
REQUIRE(list_sftp_uuid);
createPlan(&list_sftp_uuid);
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
testController.runSession(plan, true);
REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("/nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
"Listing Strategy: \"Tracking Timestamps\" vs. \"Tracking Timestamps\", "
"Hostname: \"localhost\" vs. \"localhost\", "
"Username: \"nifiuser\" vs. \"nifiuser\", "
"Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps changed configuration", "[ListSFTP][tracking-timestamps]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
"Listing Strategy: \"Tracking Timestamps\" vs. \"Tracking Timestamps\", "
"Hostname: \"localhost\" vs. \"localhost\", "
"Username: \"nifiuser\" vs. \"nifiuser\", "
"Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/tstFile.ext\" because it has not changed"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one new file", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one older file in tracking window", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -360 /* 6 minutes ago */);
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one older file outside tracking window", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -6 * 3600 /* 6 hours ago */);
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file2.ext\" because it has an older timestamp than the minimum timestamp to list"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file another file with the same timestamp", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
auto mtime = utils::file::FileUtils::last_write_time(file);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
createFile("nifi_test/file2.ext", "Test content 2", mtime);
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file timestamp updated", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
auto mtime = utils::file::FileUtils::last_write_time(file);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
REQUIRE(true == utils::file::FileUtils::set_last_write_time(file, mtime + 1));
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Found file \"nifi_test/file1.ext\" with newer timestamp"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file size changes", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
auto mtime = utils::file::FileUtils::last_write_time(file);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
createFile("nifi_test/file1.ext", "Longer test content 1", mtime);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Found file \"nifi_test/file1.ext\" with different size: 14 -> 21"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
utils::Identifier list_sftp_uuid = list_sftp->getUUID();
REQUIRE(list_sftp_uuid);
createPlan(&list_sftp_uuid);
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
std::unordered_map<std::string, std::string> state;
REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Entities" == state.at("listing_strategy"));
REQUIRE("nifi_test/file1.ext" == state.at("entity.0.name"));
REQUIRE("14" == state.at("entity.0.size"));
REQUIRE(!state.at("entity.0.timestamp").empty());
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
testController.runSession(plan, true);
REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Entities" == state.at("listing_strategy"));
if (state.at("entity.0.name") == "nifi_test/file1.ext") {
REQUIRE("nifi_test/file1.ext" == state.at("entity.0.name"));
REQUIRE("nifi_test/file2.ext" == state.at("entity.1.name"));
} else {
REQUIRE("nifi_test/file2.ext" == state.at("entity.0.name"));
REQUIRE("nifi_test/file1.ext" == state.at("entity.1.name"));
}
REQUIRE("14" == state.at("entity.0.size"));
REQUIRE(!state.at("entity.0.timestamp").empty());
REQUIRE("14" == state.at("entity.1.size"));
REQUIRE(!state.at("entity.1.timestamp").empty());
REQUIRE(LogTestController::getInstance().contains("Successfully loaded state"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
REQUIRE(!LogTestController::getInstance().contains("key:filename value:file1.ext", std::chrono::seconds(0)));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state changed configuration", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
std::unordered_map<std::string, std::string> state;
REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Entities" == state.at("listing_strategy"));
utils::Identifier list_sftp_uuid = list_sftp->getUUID();
REQUIRE(list_sftp_uuid);
createPlan(&list_sftp_uuid);
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
testController.runSession(plan, true);
REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
REQUIRE("localhost" == state.at("hostname"));
REQUIRE("nifiuser" == state.at("username"));
REQUIRE("/nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Entities" == state.at("listing_strategy"));
REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
"Listing Strategy: \"Tracking Entities\" vs. \"Tracking Entities\", "
"Hostname: \"localhost\" vs. \"localhost\", "
"Username: \"nifiuser\" vs. \"nifiuser\", "
"Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities changed configuration", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
"Listing Strategy: \"Tracking Entities\" vs. \"Tracking Entities\", "
"Hostname: \"localhost\" vs. \"localhost\", "
"Username: \"nifiuser\" vs. \"nifiuser\", "
"Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities Initial Listing Target Tracking Time Window entity outside window", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
plan->setProperty(list_sftp, "Entity Tracking Initial Listing Target", processors::ListSFTP::ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW);
plan->setProperty(list_sftp, "Entity Tracking Time Window", "10 minutes");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1", -20*60 /* 20 minutes ago */);
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file1.ext\" because it has an older timestamp than the minimum timestamp to list"));
REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities Initial Listing Target Tracking Time Window entity inside window", "[ListSFTP][tracking-entities]") {
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
plan->setProperty(list_sftp, "Entity Tracking Initial Listing Target", processors::ListSFTP::ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW);
plan->setProperty(list_sftp, "Entity Tracking Time Window", "10 minutes");
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
}