blob: 8e38f13dac9210d679a88af348e5276112d50150 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/stat.h>
#include <cstring>
#include <utility>
#include <chrono>
#include <fstream>
#include <memory>
#include <string>
#include <thread>
#include <type_traits>
#include <vector>
#include <iostream>
#include <algorithm>
#include <functional>
#include <iterator>
#include <random>
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "utils/file/FileUtils.h"
#include "core/logging/Logger.h"
#include "core/ProcessGroup.h"
#include "FlowController.h"
#include "unit/ProvenanceTestHelper.h"
#include "processors/FetchSFTP.h"
#include "processors/ListSFTP.h"
#include "processors/GenerateFlowFile.h"
#include "processors/LogAttribute.h"
#include "processors/PutFile.h"
#include "tools/SFTPTestServer.h"
using namespace std::literals::chrono_literals;
class ListThenFetchSFTPTestsFixture {
public:
ListThenFetchSFTPTestsFixture() {
LogTestController::getInstance().reset();
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::FlowController>();
LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
LogTestController::getInstance().setDebug<minifi::core::Processor>();
LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
LogTestController::getInstance().setDebug<minifi::processors::GenerateFlowFile>();
LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
LogTestController::getInstance().setTrace<minifi::processors::ListSFTP>();
LogTestController::getInstance().setTrace<minifi::processors::FetchSFTP>();
LogTestController::getInstance().setTrace<minifi::processors::PutFile>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
LogTestController::getInstance().setDebug<SFTPTestServer>();
REQUIRE_FALSE(src_dir.empty());
REQUIRE_FALSE(dst_dir.empty());
REQUIRE(plan);
// Start SFTP server
sftp_server = std::make_unique<SFTPTestServer>(src_dir);
REQUIRE(true == sftp_server->start());
// Build MiNiFi processing graph
list_sftp = plan->addProcessor(
"ListSFTP",
"ListSFTP");
fetch_sftp = plan->addProcessor(
"FetchSFTP",
"FetchSFTP",
core::Relationship("success", "d"),
true);
log_attribute = plan->addProcessor("LogAttribute",
"LogAttribute",
{ core::Relationship("success", "d"),
core::Relationship("comms.failure", "d"),
core::Relationship("not.found", "d"),
core::Relationship("permission.denied", "d") },
true);
put_file = plan->addProcessor("PutFile",
"PutFile",
core::Relationship("success", "d"),
true);
// Configure ListSFTP processor
plan->setProperty(list_sftp, "Listing Strategy", minifi::processors::ListSFTP::LISTING_STRATEGY_TRACKING_TIMESTAMPS);
plan->setProperty(list_sftp, "Hostname", "localhost");
plan->setProperty(list_sftp, "Port", std::to_string(sftp_server->getPort()));
plan->setProperty(list_sftp, "Username", "nifiuser");
plan->setProperty(list_sftp, "Password", "nifipassword");
plan->setProperty(list_sftp, "Search Recursively", "false");
plan->setProperty(list_sftp, "Follow symlink", "false");
plan->setProperty(list_sftp, "Ignore Dotted Files", "false");
plan->setProperty(list_sftp, "Strict Host Key Checking", "false");
plan->setProperty(list_sftp, "Connection Timeout", "30 sec");
plan->setProperty(list_sftp, "Data Timeout", "30 sec");
plan->setProperty(list_sftp, "Send Keep Alive On Timeout", "true");
plan->setProperty(list_sftp, "Target System Timestamp Precision", minifi::processors::ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT);
plan->setProperty(list_sftp, "Minimum File Age", "0 sec");
plan->setProperty(list_sftp, "Minimum File Size", "0 B");
plan->setProperty(list_sftp, "Target System Timestamp Precision", "Seconds");
plan->setProperty(list_sftp, "Remote Path", "nifi_test/");
plan->setProperty(list_sftp, "State File", (src_dir / "state").string());
// Configure FetchSFTP processor
plan->setProperty(fetch_sftp, "Hostname", "localhost");
plan->setProperty(fetch_sftp, "Port", std::to_string(sftp_server->getPort()));
plan->setProperty(fetch_sftp, "Username", "nifiuser");
plan->setProperty(fetch_sftp, "Password", "nifipassword");
plan->setProperty(fetch_sftp, "Completion Strategy", minifi::processors::FetchSFTP::COMPLETION_STRATEGY_NONE);
plan->setProperty(fetch_sftp, "Connection Timeout", "30 sec");
plan->setProperty(fetch_sftp, "Data Timeout", "30 sec");
plan->setProperty(fetch_sftp, "Strict Host Key Checking", "false");
plan->setProperty(fetch_sftp, "Send Keep Alive On Timeout", "true");
plan->setProperty(fetch_sftp, "Use Compression", "false");
plan->setProperty(fetch_sftp, "Remote File", "${path}/${filename}");
// Configure LogAttribute processor
plan->setProperty(log_attribute, "FlowFiles To Log", "0");
// Configure PutFile processor
plan->setProperty(put_file, "Directory", (dst_dir / "${path}").string());
plan->setProperty(put_file, "Conflict Resolution Strategy", magic_enum::enum_name(minifi::processors::PutFile::FileExistsResolutionStrategy::fail));
plan->setProperty(put_file, "Create Missing Directories", "true");
}
ListThenFetchSFTPTestsFixture(ListThenFetchSFTPTestsFixture&&) = delete;
ListThenFetchSFTPTestsFixture(const ListThenFetchSFTPTestsFixture&) = delete;
ListThenFetchSFTPTestsFixture& operator=(ListThenFetchSFTPTestsFixture&&) = delete;
ListThenFetchSFTPTestsFixture& operator=(const ListThenFetchSFTPTestsFixture&) = delete;
virtual ~ListThenFetchSFTPTestsFixture() = default;
// Create source file
void createFile(const std::string& relative_path, const std::string& content, const std::optional<std::chrono::file_clock::time_point>& modification_time) const {
std::fstream file;
const auto full_path = src_dir / "vfs" / relative_path;
std::filesystem::create_directories(full_path.parent_path());
file.open(full_path, std::ios::out);
file << content;
file.close();
if (modification_time.has_value()) {
REQUIRE(utils::file::set_last_write_time(full_path, modification_time.value()));
}
}
void createFileWithModificationTimeDiff(const std::string& relative_path, const std::string& content, std::chrono::seconds modification_timediff = -5min) const {
return createFile(relative_path, content, std::chrono::file_clock::now() + modification_timediff);
}
enum TestWhere {
IN_DESTINATION,
IN_SOURCE
};
void testFile(TestWhere where, const std::filesystem::path& relative_path, std::string_view expected_content) const {
std::filesystem::path expected_path = where == IN_DESTINATION ? dst_dir / relative_path : src_dir / "vfs" / relative_path;
REQUIRE(std::filesystem::exists(expected_path));
std::filesystem::permissions(expected_path, static_cast<std::filesystem::perms>(0644));;
std::ifstream file(expected_path);
REQUIRE(file.good());
std::stringstream content;
std::vector<char> buffer(1024U);
while (file) {
file.read(buffer.data(), gsl::narrow<std::streamsize>(buffer.size()));
content << std::string(buffer.data(), file.gcount());
}
CHECK(expected_content == content.str());
}
protected:
TestController testController;
std::filesystem::path src_dir = testController.createTempDirectory();
std::filesystem::path dst_dir = testController.createTempDirectory();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::unique_ptr<SFTPTestServer> sftp_server;
core::Processor* list_sftp;
core::Processor* fetch_sftp;
core::Processor* log_attribute;
core::Processor* put_file;
};
TEST_CASE_METHOD(ListThenFetchSFTPTestsFixture, "ListSFTP then FetchSFTP one file", "[ListThenFetchSFTP][basic]") {
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
}
TEST_CASE_METHOD(ListThenFetchSFTPTestsFixture, "ListSFTP then FetchSFTP two files", "[ListThenFetchSFTP][basic]") {
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2");
/* ListSFTP */
plan->runNextProcessor();
/* FetchSFTP */
plan->runNextProcessor();
plan->runCurrentProcessor();
/* LogAttribute */
plan->runNextProcessor();
/* PutFile */
plan->runNextProcessor();
plan->runCurrentProcessor();
testFile(IN_SOURCE, "nifi_test/file1.ext", "Test content 1");
testFile(IN_DESTINATION, "nifi_test/file1.ext", "Test content 1");
testFile(IN_SOURCE, "nifi_test/file2.ext", "Test content 2");
testFile(IN_DESTINATION, "nifi_test/file2.ext", "Test content 2");
}