blob: b9189dfc51396dd519968c856a7e48499a0ca3d5 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/stat.h>
#undef NDEBUG
#include <cassert>
#include <cstring>
#include <utility>
#include <chrono>
#include <fstream>
#include <memory>
#include <string>
#include <thread>
#include <type_traits>
#include <vector>
#include <iostream>
#include <sstream>
#include <algorithm>
#include <functional>
#include <iterator>
#include <random>
#ifndef WIN32
#include <unistd.h>
#endif
#include "TestBase.h"
#include "utils/StringUtils.h"
#include "utils/file/FileUtils.h"
#include "core/Core.h"
#include "core/logging/Logger.h"
#include "core/ProcessGroup.h"
#include "core/yaml/YamlConfiguration.h"
#include "FlowController.h"
#include "properties/Configure.h"
#include "unit/ProvenanceTestHelper.h"
#include "io/StreamFactory.h"
#include "processors/PutSFTP.h"
#include "processors/GetFile.h"
#include "processors/LogAttribute.h"
#include "processors/ExtractText.h"
#include "processors/UpdateAttribute.h"
#include "tools/SFTPTestServer.h"
class PutSFTPTestsFixture {
public:
PutSFTPTestsFixture()
: src_dir(strdup("/tmp/sftps.XXXXXX"))
, dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::FlowController>();
LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
LogTestController::getInstance().setDebug<minifi::core::Processor>();
LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
LogTestController::getInstance().setDebug<processors::GetFile>();
LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
LogTestController::getInstance().setTrace<processors::PutSFTP>();
LogTestController::getInstance().setTrace<processors::ExtractText>();
LogTestController::getInstance().setDebug<processors::LogAttribute>();
LogTestController::getInstance().setDebug<SFTPTestServer>();
// Create temporary directories
testController.createTempDirectory(src_dir);
REQUIRE(src_dir != nullptr);
testController.createTempDirectory(dst_dir);
REQUIRE(dst_dir != nullptr);
// Start SFTP server
sftp_server = std::unique_ptr<SFTPTestServer>(new SFTPTestServer(dst_dir));
REQUIRE(true == sftp_server->start());
// Build MiNiFi processing graph
plan = testController.createPlan();
get_file = plan->addProcessor(
"GetFile",
"GetFile");
put = plan->addProcessor(
"PutSFTP",
"PutSFTP",
core::Relationship("success", "d"),
true);
plan->addProcessor("LogAttribute",
"LogAttribute",
{ core::Relationship("success", "d"),
core::Relationship("reject", "d"),
core::Relationship("failure", "d") },
true);
// Configure GetFile processor
plan->setProperty(get_file, "Input Directory", src_dir);
// Configure PutSFTP processor
plan->setProperty(put, "Hostname", "localhost");
plan->setProperty(put, "Port", std::to_string(sftp_server->getPort()));
plan->setProperty(put, "Username", "nifiuser");
plan->setProperty(put, "Password", "nifipassword");
plan->setProperty(put, "Remote Path", "nifi_test/");
plan->setProperty(put, "Create Directory", "true");
plan->setProperty(put, "Batch Size", "2");
plan->setProperty(put, "Connection Timeout", "30 sec");
plan->setProperty(put, "Data Timeout", "30 sec");
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_RENAME);
plan->setProperty(put, "Strict Host Key Checking", "false");
plan->setProperty(put, "Send Keep Alive On Timeout", "true");
plan->setProperty(put, "Use Compression", "false");
plan->setProperty(put, "Reject Zero-Byte Files", "true");
}
virtual ~PutSFTPTestsFixture() {
free(src_dir);
free(dst_dir);
LogTestController::getInstance().reset();
}
// Create source file
void createFile(const std::string &dir, const std::string& relative_path, const std::string& content) {
std::fstream file;
std::stringstream ss;
ss << dir << "/" << relative_path;
file.open(ss.str(), std::ios::out);
file << content;
file.close();
}
// Test target file
void testFile(const std::string& relative_path, const std::string& expected_content) {
std::stringstream resultFile;
resultFile << dst_dir << "/vfs/" << relative_path;
std::ifstream file(resultFile.str());
REQUIRE(true == file.good());
std::stringstream content;
std::vector<char> buffer(1024U);
while (file) {
file.read(buffer.data(), buffer.size());
content << std::string(buffer.data(), file.gcount());
}
REQUIRE(expected_content == content.str());
}
void testFileNotExists(const std::string& relative_path) {
std::stringstream resultFile;
resultFile << dst_dir << "/vfs/" << relative_path;
std::ifstream file(resultFile.str());
REQUIRE(false == file.is_open());
REQUIRE(false == file.good());
}
void testModificationTime(const std::string& relative_path, int64_t mtime) {
std::stringstream resultFile;
resultFile << dst_dir << "/vfs/" << relative_path;
REQUIRE(mtime == utils::file::FileUtils::last_write_time(resultFile.str()));
}
void testPermissions(const std::string& relative_path, uint32_t expected_permissions) {
std::stringstream resultFile;
resultFile << dst_dir << "/vfs/" << relative_path;
uint32_t permissions = 0U;
REQUIRE(true == utils::file::FileUtils::get_permissions(resultFile.str(), permissions));
REQUIRE(expected_permissions == permissions);
}
void testOwner(const std::string& relative_path, uint64_t expected_uid) {
std::stringstream resultFile;
resultFile << dst_dir << "/vfs/" << relative_path;
uint64_t uid = 0U;
uint64_t gid = 0U;
REQUIRE(true == utils::file::FileUtils::get_uid_gid(resultFile.str(), uid, gid));
REQUIRE(expected_uid == uid);
}
void testGroup(const std::string& relative_path, uint64_t expected_gid) {
std::stringstream resultFile;
resultFile << dst_dir << "/vfs/" << relative_path;
uint64_t uid = 0U;
uint64_t gid = 0U;
REQUIRE(true == utils::file::FileUtils::get_uid_gid(resultFile.str(), uid, gid));
REQUIRE(expected_gid == gid);
}
size_t directoryContentCount(const std::string& dir) {
size_t count = 0U;
utils::file::FileUtils::list_dir(dir, [&count](const std::string&, const std::string&) {
count++;
return true;
}, testController.getLogger());
return count;
}
protected:
char *src_dir;
char *dst_dir;
std::unique_ptr<SFTPTestServer> sftp_server;
TestController testController;
std::shared_ptr<TestPlan> plan;
std::shared_ptr<core::Processor> get_file;
std::shared_ptr<core::Processor> put;
};
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP put one file", "[PutSFTP][basic]") {
createFile(src_dir, "tstFile.ext", "tempFile");
testController.runSession(plan, true);
testFile("nifi_test/tstFile.ext", "tempFile");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP put two files", "[PutSFTP][basic]") {
createFile(src_dir, "tstFile1.ext", "content 1");
createFile(src_dir, "tstFile2.ext", "content 2");
testController.runSession(plan, true);
testFile("nifi_test/tstFile1.ext", "content 1");
testFile("nifi_test/tstFile2.ext", "content 2");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP bad password", "[PutSFTP][authentication]") {
plan->setProperty(put, "Password", "badpassword");
createFile(src_dir, "tstFile.ext", "tempFile");
try {
testController.runSession(plan, true);
} catch (std::exception &e) {
std::string expected("Process Session Operation:Can not find the transfer relationship for the updated flow");
REQUIRE(0 == std::string(e.what()).compare(0, expected.size(), expected));
}
REQUIRE(LogTestController::getInstance().contains("Failed to authenticate with password, error: Authentication failed (username/password)"));
REQUIRE(LogTestController::getInstance().contains("Could not authenticate with any available method"));
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP public key authentication success", "[PutSFTP][authentication]") {
plan->setProperty(put, "Private Key Path", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"));
plan->setProperty(put, "Private Key Passphrase", "privatekeypassword");
createFile(src_dir, "tstFile.ext", "tempFile");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with publickey"));
testFile("nifi_test/tstFile.ext", "tempFile");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP public key authentication bad passphrase", "[PutSFTP][authentication]") {
plan->setProperty(put, "Password", "");
plan->setProperty(put, "Private Key Path", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"));
plan->setProperty(put, "Private Key Passphrase", "badpassword");
createFile(src_dir, "tstFile.ext", "tempFile");
try {
testController.runSession(plan, true);
} catch (std::exception &e) {
std::string expected("Process Session Operation:Can not find the transfer relationship for the updated flow");
REQUIRE(0 == std::string(e.what()).compare(0, expected.size(), expected));
}
REQUIRE(LogTestController::getInstance().contains("Failed to authenticate with publickey, error: Unable to extract public key from private key file: Wrong passphrase or invalid/unrecognized private key file format"));
REQUIRE(LogTestController::getInstance().contains("Could not authenticate with any available method"));
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP public key authentication bad passphrase fallback to password", "[PutSFTP][authentication]") {
plan->setProperty(put, "Private Key Path", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"));
plan->setProperty(put, "Private Key Passphrase", "badpassword");
createFile(src_dir, "tstFile.ext", "tempFile");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Failed to authenticate with publickey, error: Unable to extract public key from private key file: Wrong passphrase or invalid/unrecognized private key file format"));
REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with password"));
testFile("nifi_test/tstFile.ext", "tempFile");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP host key checking success", "[PutSFTP][hostkey]") {
plan->setProperty(put, "Host Key File", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/known_hosts"));
plan->setProperty(put, "Strict Host Key Checking", "true");
createFile(src_dir, "tstFile.ext", "tempFile");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Host key verification succeeded for localhost"));
testFile("nifi_test/tstFile.ext", "tempFile");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP host key checking missing strict", "[PutSFTP][hostkey]") {
plan->setProperty(put, "Hostname", "127.0.0.1");
plan->setProperty(put, "Host Key File", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/known_hosts"));
plan->setProperty(put, "Strict Host Key Checking", "true");
createFile(src_dir, "tstFile.ext", "tempFile");
try {
testController.runSession(plan, true);
} catch (std::exception &e) {
std::string expected("Process Session Operation:Can not find the transfer relationship for the updated flow");
REQUIRE(0 == std::string(e.what()).compare(0, expected.size(), expected));
}
REQUIRE(LogTestController::getInstance().contains("Host 127.0.0.1 not found in the host key file"));
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP host key checking missing non-strict", "[PutSFTP][hostkey]") {
plan->setProperty(put, "Hostname", "127.0.0.1");
plan->setProperty(put, "Host Key File", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/known_hosts"));
plan->setProperty(put, "Strict Host Key Checking", "false");
createFile(src_dir, "tstFile.ext", "tempFile");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Host 127.0.0.1 not found in the host key file"));
testFile("nifi_test/tstFile.ext", "tempFile");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP host key checking mismatch strict", "[PutSFTP][hostkey]") {
plan->setProperty(put, "Host Key File", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/known_hosts_mismatch"));
plan->setProperty(put, "Strict Host Key Checking", "true");
createFile(src_dir, "tstFile.ext", "tempFile");
try {
testController.runSession(plan, true);
} catch (std::exception &e) {
std::string expected("Process Session Operation:Can not find the transfer relationship for the updated flow");
REQUIRE(0 == std::string(e.what()).compare(0, expected.size(), expected));
}
REQUIRE(LogTestController::getInstance().contains("Host key mismatch for localhost, expected: "
"AAAAB3NzaC1yc2EAAAADAQABAAABAQCueV6fHbTECMPps4nXJ9jiVcxArTKXYip+"
"SEIBwkvmQiEDj4/zldU4KUn4QRwqbFmR9JO3s3SkPVzvP9bKh2Xk3nICB73iMs4v"
"wO2nZKpkBFtNz6+w0LsqDzQe9piW0ukoXw2Ce41yQK+9xtugPVHbbchP0esDanDf"
"SGjbQyPsmarfuJ8K+ACgGmWB9GKSthq7j+gArgefz0SGHkoKXA+3OXF/D6/MnLLv"
"H1sYmVyIO6CzDiurHUS2o7MWjiw3qK+n6o9rW0fpLFM/l3+8dLCt3e6D+lLQsJVX"
"iL1TJVu+Lf2z9kMc+3brFjFNVBRDxjYMVjOhUr6JyU3ouL1i6P/9"
", actual: "
"AAAAB3NzaC1yc2EAAAADAQABAAABAQCrRDRfH278iGChp1a5hSMzQcDd63YoA2Np"
"VELEXzKmrPOuUgQXpdzdxk17oTdh5D+xTax2sTa3ZR55RWl1q6keqnrRvihWpBqF"
"N6D0aKUmGe/9Xlxhfe8v/RBs3j9JxlmbPtIFwKXF1ePfLFaI6n1BbXs0WUR8M7Cw"
"4OMFYTuvQ7IcrPE/FU+Xh4hdNm6y3j0ppKBj3LnZOABI/Ql/fyJUTpRqrLIqqdMi"
"3zxLNVBx7mVZQZICO1IPkh7ZqT0s3HyGT5hlsVuZsRfpqOjT4QfBeNjWuHdB5cDs"
"mDbaOvO6iQ/NY63uy7t/2VGmSASSzi4wzILvXKQTL3Lx5de9Tol7"));
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP conflict resolution rename", "[PutSFTP][conflict-resolution]") {
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_RENAME);
createFile(src_dir, "tstFile1.ext", "content 1");
REQUIRE(0 == utils::file::FileUtils::create_dir(utils::file::FileUtils::concat_path(dst_dir, "vfs/nifi_test")));
createFile(utils::file::FileUtils::concat_path(dst_dir, "vfs"), "nifi_test/tstFile1.ext", "content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship success"));
testFile("nifi_test/1.tstFile1.ext", "content 1");
testFile("nifi_test/tstFile1.ext", "content 2");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP conflict resolution reject", "[PutSFTP][conflict-resolution]") {
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_REJECT);
createFile(src_dir, "tstFile1.ext", "content 1");
REQUIRE(0 == utils::file::FileUtils::create_dir(utils::file::FileUtils::concat_path(dst_dir, "vfs/nifi_test")));
createFile(utils::file::FileUtils::concat_path(dst_dir, "vfs"), "nifi_test/tstFile1.ext", "content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship reject"));
testFile("nifi_test/tstFile1.ext", "content 2");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP conflict resolution fail", "[PutSFTP][conflict-resolution]") {
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_FAIL);
createFile(src_dir, "tstFile1.ext", "content 1");
REQUIRE(0 == utils::file::FileUtils::create_dir(utils::file::FileUtils::concat_path(dst_dir, "vfs/nifi_test")));
createFile(utils::file::FileUtils::concat_path(dst_dir, "vfs"), "nifi_test/tstFile1.ext", "content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship failure"));
testFile("nifi_test/tstFile1.ext", "content 2");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP conflict resolution ignore", "[PutSFTP][conflict-resolution]") {
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_IGNORE);
createFile(src_dir, "tstFile1.ext", "content 1");
REQUIRE(0 == utils::file::FileUtils::create_dir(utils::file::FileUtils::concat_path(dst_dir, "vfs/nifi_test")));
createFile(utils::file::FileUtils::concat_path(dst_dir, "vfs"), "nifi_test/tstFile1.ext", "content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Routing tstFile1.ext to SUCCESS despite a file with the same name already existing"));
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship success"));
testFile("nifi_test/tstFile1.ext", "content 2");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP conflict resolution replace", "[PutSFTP][conflict-resolution]") {
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_REPLACE);
createFile(src_dir, "tstFile1.ext", "content 1");
REQUIRE(0 == utils::file::FileUtils::create_dir(utils::file::FileUtils::concat_path(dst_dir, "vfs/nifi_test")));
createFile(utils::file::FileUtils::concat_path(dst_dir, "vfs"), "nifi_test/tstFile1.ext", "content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship success"));
testFile("nifi_test/tstFile1.ext", "content 1");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP conflict resolution none", "[PutSFTP][conflict-resolution]") {
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_NONE);
createFile(src_dir, "tstFile1.ext", "content 1");
REQUIRE(0 == utils::file::FileUtils::create_dir(utils::file::FileUtils::concat_path(dst_dir, "vfs/nifi_test")));
createFile(utils::file::FileUtils::concat_path(dst_dir, "vfs"), "nifi_test/tstFile1.ext", "content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship failure"));
testFile("nifi_test/tstFile1.ext", "content 2");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP conflict resolution with directory existing at target", "[PutSFTP][conflict-resolution]") {
bool should_predetect_failure = true;
SECTION("with conflict resolution rename") {
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_RENAME);
}
SECTION("with conflict resolution reject") {
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_REJECT);
}
SECTION("with conflict resolution fail") {
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_FAIL);
}
SECTION("with conflict resolution ignore") {
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_IGNORE);
}
SECTION("with conflict resolution replace") {
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_REPLACE);
}
SECTION("with conflict resolution none") {
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_NONE);
should_predetect_failure = false;
}
createFile(src_dir, "tstFile1.ext", "content 1");
REQUIRE(0 == utils::file::FileUtils::create_dir(utils::file::FileUtils::concat_path(dst_dir, "vfs/nifi_test")));
REQUIRE(0 == utils::file::FileUtils::create_dir(utils::file::FileUtils::concat_path(dst_dir, "vfs/nifi_test/tstFile1.ext")));
testController.runSession(plan, true);
if (should_predetect_failure) {
REQUIRE(LogTestController::getInstance().contains("Rejecting tstFile1.ext because a directory with the same name already exists"));
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship reject"));
} else {
REQUIRE(LogTestController::getInstance().contains("Failed to rename remote file \"nifi_test/.tstFile1.ext\" to \"nifi_test/tstFile1.ext\", error: LIBSSH2_FX_FILE_ALREADY_EXISTS"));
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship failure"));
}
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP reject zero-byte false", "[PutSFTP]") {
plan->setProperty(put, "Reject Zero-Byte Files", "false");
createFile(src_dir, "tstFile1.ext", "");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship success"));
testFile("nifi_test/tstFile1.ext", "");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP reject zero-byte true", "[PutSFTP]") {
plan->setProperty(put, "Reject Zero-Byte Files", "true");
createFile(src_dir, "tstFile1.ext", "");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Rejecting tstFile1.ext because it is zero bytes"));
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship reject"));
testFileNotExists("nifi_test/tstFile1.ext");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP set mtime", "[PutSFTP]") {
plan->setProperty(put, "Last Modified Time", "2065-01-24T05:20:00Z");
createFile(src_dir, "tstFile1.ext", "content 1");
testController.runSession(plan, true);
testFile("nifi_test/tstFile1.ext", "content 1");
testModificationTime("nifi_test/tstFile1.ext", 3000000000LL);
}
#ifndef WIN32
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP set permissions", "[PutSFTP]") {
plan->setProperty(put, "Permissions", "0613");
createFile(src_dir, "tstFile1.ext", "content 1");
testController.runSession(plan, true);
testFile("nifi_test/tstFile1.ext", "content 1");
testPermissions("nifi_test/tstFile1.ext", 0613);
}
#endif
#ifndef WIN32
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP set uid and gid", "[PutSFTP]") {
if (getuid() != 0) {
std::cerr << "!!!! This test ONLY works as root, because it needs to chown. Exiting. !!!!" << std::endl;
return;
}
plan->setProperty(put, "Remote Owner", "1234");
plan->setProperty(put, "Remote Group", "4567");
createFile(src_dir, "tstFile1.ext", "content 1");
testController.runSession(plan, true);
testFile("nifi_test/tstFile1.ext", "content 1");
testOwner("nifi_test/tstFile1.ext", 1234);
testGroup("nifi_test/tstFile1.ext", 4567);
}
#endif
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP disable directory creation", "[PutSFTP]") {
plan->setProperty(put, "Create Directory", "false");
createFile(src_dir, "tstFile1.ext", "content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship failure"));
testFileNotExists("nifi_test/tstFile1.ext");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP test dot rename", "[PutSFTP]") {
bool should_fail = false;
SECTION("with dot rename enabled") {
plan->setProperty(put, "Dot Rename", "true");
should_fail = true;
}
SECTION("with dot rename disabled") {
plan->setProperty(put, "Dot Rename", "false");
should_fail = false;
}
createFile(src_dir, "tstFile1.ext", "content 1");
/*
* We create the would-be dot renamed file in the target, and because we don't overwrite temporary files,
* if we really use a dot renamed temporary file, we should fail.
*/
REQUIRE(0 == utils::file::FileUtils::create_dir(utils::file::FileUtils::concat_path(dst_dir, "vfs/nifi_test")));
createFile(utils::file::FileUtils::concat_path(dst_dir, "vfs"), "nifi_test/.tstFile1.ext", "");
testController.runSession(plan, true);
if (should_fail) {
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship failure"));
testFileNotExists("nifi_test/tstFile1.ext");
} else {
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship success"));
testFile("nifi_test/tstFile1.ext", "content 1");
}
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP test temporary filename", "[PutSFTP]") {
bool should_fail = false;
SECTION("with temporary filename set") {
/* Also test expression language */
plan->setProperty(put, "Temporary Filename", "${ filename:append('.temp') }");
should_fail = true;
}
SECTION("with temporary filename not set and dot rename disabled") {
plan->setProperty(put, "Dot Rename", "false");
should_fail = false;
}
createFile(src_dir, "tstFile1.ext", "content 1");
/*
* We create the would-be temporary file in the target, and because we don't overwrite temporary files,
* if we really use the temporary file, we should fail.
*/
REQUIRE(0 == utils::file::FileUtils::create_dir(utils::file::FileUtils::concat_path(dst_dir, "vfs/nifi_test")));
createFile(utils::file::FileUtils::concat_path(dst_dir, "vfs"), "nifi_test/tstFile1.ext.temp", "");
testController.runSession(plan, true);
if (should_fail) {
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship failure"));
testFileNotExists("nifi_test/tstFile1.ext");
} else {
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship success"));
testFile("nifi_test/tstFile1.ext", "content 1");
}
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP test temporary file cleanup", "[PutSFTP]") {
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_NONE);
createFile(src_dir, "tstFile1.ext", "content 1");
REQUIRE(0 == utils::file::FileUtils::create_dir(utils::file::FileUtils::concat_path(dst_dir, "vfs/nifi_test")));
createFile(utils::file::FileUtils::concat_path(dst_dir, "vfs"), "nifi_test/tstFile1.ext", "content 2");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship failure"));
testFile("nifi_test/tstFile1.ext", "content 2");
testFileNotExists("nifi_test/.tstFile1.ext");
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP test disable directory listing", "[PutSFTP]") {
bool should_list = false;
SECTION("with directory listing enabled") {
plan->setProperty(put, "Disable Directory Listing", "false");
should_list = true;
}
SECTION("with directory listing disabled") {
plan->setProperty(put, "Disable Directory Listing", "true");
should_list = false;
}
createFile(src_dir, "tstFile1.ext", "content 1");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("from PutSFTP to relationship success"));
testFileNotExists("nifi_test/inner/tstFile1.ext");
REQUIRE(should_list == LogTestController::getInstance().contains("Failed to stat remote path \"nifi_test\", error: LIBSSH2_FX_NO_SUCH_FILE"));
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP connection caching reuse", "[PutSFTP][connection-caching]") {
createFile(src_dir, "tstFile1.ext", "content 1");
createFile(src_dir, "tstFile2.ext", "content 2");
testController.runSession(plan, true);
testFile("nifi_test/tstFile1.ext", "content 1");
testFile("nifi_test/tstFile2.ext", "content 2");
REQUIRE(LogTestController::getInstance().contains("Adding nifiuser@localhost:" + std::to_string(sftp_server->getPort()) + " to SFTP connection pool"));
REQUIRE(LogTestController::getInstance().contains("Removing nifiuser@localhost:" + std::to_string(sftp_server->getPort()) + " from SFTP connection pool"));
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP connection caching does not reuse bad connection", "[PutSFTP][connection-caching]") {
createFile(src_dir, "tstFile1.ext", "content 1");
/* Simulate connection failure */
auto port = sftp_server->getPort();
sftp_server.reset();
try {
testController.runSession(plan, true);
} catch (std::exception &e) {
std::string expected("Process Session Operation:Can not find the transfer relationship for the updated flow");
REQUIRE(0 == std::string(e.what()).compare(0, expected.size(), expected));
}
testController.runSession(plan, true);
REQUIRE(false == LogTestController::getInstance().contains("Adding nifiuser@localhost:" + std::to_string(port) + " to SFTP connection pool"));
REQUIRE(LogTestController::getInstance().contains("Cannot connect to SFTP server"));
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP connection caching reaches limit", "[PutSFTP][connection-caching]") {
plan = testController.createPlan();
get_file = plan->addProcessor(
"GetFile",
"GetFile");
auto extract_text = plan->addProcessor(
"ExtractText",
"ExtractText",
core::Relationship("success", "d"),
true);
put = plan->addProcessor(
"PutSFTP",
"PutSFTP",
core::Relationship("success", "d"),
true);
plan->addProcessor("LogAttribute",
"LogAttribute",
{ core::Relationship("success", "d"),
core::Relationship("reject", "d"),
core::Relationship("failure", "d") },
true);
// Configure GetFile processor
plan->setProperty(get_file, "Batch Size", "1");
plan->setProperty(get_file, "Input Directory", src_dir);
// Configure ExtractText processor
plan->setProperty(extract_text, "Attribute", "port_num");
// Configure PutSFTP processor
plan->setProperty(put, "Hostname", "localhost");
plan->setProperty(put, "Port", "${'port_num'}");
plan->setProperty(put, "Username", "nifiuser");
plan->setProperty(put, "Password", "nifipassword");
plan->setProperty(put, "Remote Path", "nifi_test/");
plan->setProperty(put, "Create Directory", "true");
plan->setProperty(put, "Batch Size", "2");
plan->setProperty(put, "Connection Timeout", "30 sec");
plan->setProperty(put, "Data Timeout", "30 sec");
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_RENAME);
plan->setProperty(put, "Strict Host Key Checking", "false");
plan->setProperty(put, "Send Keep Alive On Timeout", "true");
plan->setProperty(put, "Use Compression", "false");
plan->setProperty(put, "Reject Zero-Byte Files", "true");
sftp_server.reset();
std::vector<std::vector<char>> dst_dirs;
std::vector<std::unique_ptr<SFTPTestServer>> sftp_servers;
std::string tmp_dir_format("/tmp/sftpd.XXXXXX");
for (size_t i = 0; i < 10; i++) {
dst_dirs.emplace_back(tmp_dir_format.data(), tmp_dir_format.data() + tmp_dir_format.size() + 1);
testController.createTempDirectory(dst_dirs.back().data());
sftp_servers.emplace_back(new SFTPTestServer(dst_dirs.back().data()));
REQUIRE(true == sftp_servers.back()->start());
createFile(src_dir, "tstFile" + std::to_string(i) + ".ext", std::to_string(sftp_servers.back()->getPort()));
testController.runSession(plan, true);
plan->reset();
if (i == 8) {
REQUIRE(LogTestController::getInstance().contains("SFTP connection pool is full, removing nifiuser@localhost:" + std::to_string(sftp_servers[0]->getPort())));
REQUIRE(LogTestController::getInstance().contains("Closing SFTPClient for localhost:" + std::to_string(sftp_servers[0]->getPort())));
REQUIRE(LogTestController::getInstance().contains("Adding nifiuser@localhost:" + std::to_string(sftp_servers[8]->getPort()) + " to SFTP connection pool"));
} else if (i == 9) {
REQUIRE(LogTestController::getInstance().contains("SFTP connection pool is full, removing nifiuser@localhost:" + std::to_string(sftp_servers[1]->getPort())));
REQUIRE(LogTestController::getInstance().contains("Closing SFTPClient for localhost:" + std::to_string(sftp_servers[1]->getPort())));
REQUIRE(LogTestController::getInstance().contains("Adding nifiuser@localhost:" + std::to_string(sftp_servers[9]->getPort()) + " to SFTP connection pool"));
}
}
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP batching two files in one batch", "[PutSFTP][batching]") {
plan->setProperty(put, "Batch Size", "2");
createFile(src_dir, "tstFile1.ext", "content 1");
createFile(src_dir, "tstFile2.ext", "content 2");
testController.runSession(plan, true);
REQUIRE(2U == directoryContentCount(std::string(dst_dir) + "/vfs/nifi_test"));
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP batching two files in two batches", "[PutSFTP][batching]") {
plan->setProperty(put, "Batch Size", "1");
createFile(src_dir, "tstFile1.ext", "content 1");
createFile(src_dir, "tstFile2.ext", "content 2");
testController.runSession(plan, true);
REQUIRE(1U == directoryContentCount(std::string(dst_dir) + "/vfs/nifi_test"));
plan->reset();
createFile(src_dir, "tstFile1.ext", "content 1");
createFile(src_dir, "tstFile2.ext", "content 2");
testController.runSession(plan, true);
REQUIRE(2U == directoryContentCount(std::string(dst_dir) + "/vfs/nifi_test"));
plan->reset();
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP batching does not fail even if one is routed to Failure", "[PutSFTP][batching]") {
plan->setProperty(put, "Batch Size", "3");
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_FAIL);
createFile(src_dir, "tstFile1.ext", "content 1");
createFile(src_dir, "tstFile2.ext", "content 2");
createFile(src_dir, "tstFile3.ext", "content 3");
REQUIRE(0 == utils::file::FileUtils::create_dir(utils::file::FileUtils::concat_path(dst_dir, "vfs/nifi_test")));
createFile(utils::file::FileUtils::concat_path(dst_dir, "vfs"), "nifi_test/tstFile1.ext", "content other");
createFile(utils::file::FileUtils::concat_path(dst_dir, "vfs"), "nifi_test/tstFile2.ext", "content other");
testController.runSession(plan, true);
testFile("nifi_test/tstFile3.ext", "content 3");
REQUIRE(LogTestController::getInstance().contains("Routing tstFile1.ext to FAILURE because a file with the same name already exists"));
REQUIRE(LogTestController::getInstance().contains("Routing tstFile2.ext to FAILURE because a file with the same name already exists"));
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP put large file", "[PutSFTP]") {
std::mt19937 rng(std::random_device{}());
std::string content(4 * 1024 * 1024U, '\0');
std::generate_n(content.begin(), 4 * 1024 * 1024U, std::ref(rng));
createFile(src_dir, "tstFile.ext", content);
testController.runSession(plan, true);
testFile("nifi_test/tstFile.ext", content);
}
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP expression language test", "[PutSFTP]") {
plan = testController.createPlan();
get_file = plan->addProcessor(
"GetFile",
"GetFile");
auto update_attribute = plan->addProcessor(
"UpdateAttribute",
"UpdateAttribute",
core::Relationship("success", "d"),
true);
put = plan->addProcessor(
"PutSFTP",
"PutSFTP",
core::Relationship("success", "d"),
true);
plan->addProcessor("LogAttribute",
"LogAttribute",
{ core::Relationship("success", "d"),
core::Relationship("reject", "d"),
core::Relationship("failure", "d") },
true);
// Configure GetFile processor
plan->setProperty(get_file, "Input Directory", src_dir);
// Configure UpdateAttribute processor
plan->setProperty(update_attribute, "attr_Hostname", "localhost", true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Port", std::to_string(sftp_server->getPort()), true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Username", "nifiuser", true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Password", "nifipassword", true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Private Key Path",
utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"), true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Private Key Passphrase", "privatekeypassword", true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Remote Path", "nifi_test/", true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Temporary Filename", "tempfile", true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Last Modified Time", "2065-01-24T05:20:00Z", true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Permissions", "rw-------", true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Remote Owner", "1234", true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Remote Group", "5678", true /*dynamic*/);
// Configure PutSFTP processor
plan->setProperty(put, "Hostname", "${'attr_Hostname'}");
plan->setProperty(put, "Port", "${'attr_Port'}");
plan->setProperty(put, "Username", "${'attr_Username'}");
plan->setProperty(put, "Password", "${'attr_Password'}");
plan->setProperty(put, "Private Key Path", "${'attr_Private Key Path'}");
plan->setProperty(put, "Private Key Passphrase", "${'attr_Private Key Passphrase'}");
plan->setProperty(put, "Remote Path", "${'attr_Remote Path'}");
plan->setProperty(put, "Temporary Filename", "${'attr_Temporary Filename'}");
plan->setProperty(put, "Last Modified Time", "${'attr_Last Modified Time'}");
plan->setProperty(put, "Permissions", "${'attr_Permissions'}");
plan->setProperty(put, "Remote Owner", "${'attr_Remote Owner'}");
plan->setProperty(put, "Remote Group", "${'attr_Remote Group'}");
plan->setProperty(put, "Create Directory", "true");
plan->setProperty(put, "Batch Size", "2");
plan->setProperty(put, "Connection Timeout", "30 sec");
plan->setProperty(put, "Data Timeout", "30 sec");
plan->setProperty(put, "Conflict Resolution", processors::PutSFTP::CONFLICT_RESOLUTION_RENAME);
plan->setProperty(put, "Strict Host Key Checking", "false");
plan->setProperty(put, "Send Keep Alive On Timeout", "true");
plan->setProperty(put, "Use Compression", "false");
plan->setProperty(put, "Reject Zero-Byte Files", "true");
createFile(src_dir, "tstFile.ext", "some content");
testController.runSession(plan, true);
testFile("nifi_test/tstFile.ext", "some content");
}