blob: d7e95c7436dabf2399607cecffcbde57d3f865fa [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/stat.h>
#undef NDEBUG
#include <cassert>
#include <cstring>
#include <utility>
#include <chrono>
#include <fstream>
#include <memory>
#include <string>
#include <thread>
#include <type_traits>
#include <vector>
#include <iostream>
#include <sstream>
#include <algorithm>
#include <functional>
#include <iterator>
#include <random>
#ifndef WIN32
#include <unistd.h>
#endif
#include "TestBase.h"
#include "utils/StringUtils.h"
#include "utils/file/FileUtils.h"
#include "core/Core.h"
#include "core/logging/Logger.h"
#include "core/ProcessGroup.h"
#include "core/yaml/YamlConfiguration.h"
#include "FlowController.h"
#include "properties/Configure.h"
#include "unit/ProvenanceTestHelper.h"
#include "io/StreamFactory.h"
#include "processors/FetchSFTP.h"
#include "processors/GenerateFlowFile.h"
#include "processors/LogAttribute.h"
#include "processors/UpdateAttribute.h"
#include "processors/PutFile.h"
#include "tools/SFTPTestServer.h"
class FetchSFTPTestsFixture {
public:
FetchSFTPTestsFixture()
: src_dir(strdup("/tmp/sftps.XXXXXX"))
, dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::FlowController>();
LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
LogTestController::getInstance().setDebug<minifi::core::Processor>();
LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
LogTestController::getInstance().setTrace<processors::FetchSFTP>();
LogTestController::getInstance().setTrace<processors::PutFile>();
LogTestController::getInstance().setDebug<processors::LogAttribute>();
LogTestController::getInstance().setDebug<SFTPTestServer>();
// Create temporary directories
testController.createTempDirectory(src_dir);
REQUIRE(src_dir != nullptr);
testController.createTempDirectory(dst_dir);
REQUIRE(dst_dir != nullptr);
// Start SFTP server
sftp_server = std::unique_ptr<SFTPTestServer>(new SFTPTestServer(src_dir));
REQUIRE(true == sftp_server->start());
// Build MiNiFi processing graph
plan = testController.createPlan();
generate_flow_file = plan->addProcessor(
"GenerateFlowFile",
"GenerateFlowFile");
update_attribute = plan->addProcessor("UpdateAttribute",
"UpdateAttribute",
core::Relationship("success", "d"),
true);
fetch_sftp = plan->addProcessor(
"FetchSFTP",
"FetchSFTP",
core::Relationship("success", "d"),
true);
plan->addProcessor("LogAttribute",
"LogAttribute",
{ core::Relationship("success", "d"),
core::Relationship("comms.failure", "d"),
core::Relationship("not.found", "d"),
core::Relationship("permission.denied", "d") },
true);
put_file = plan->addProcessor("PutFile",
"PutFile",
core::Relationship("success", "d"),
true);
// Configure GenerateFlowFile processor
plan->setProperty(generate_flow_file, "File Size", "1B");
// Configure FetchSFTP processor
plan->setProperty(fetch_sftp, "Hostname", "localhost");
plan->setProperty(fetch_sftp, "Port", std::to_string(sftp_server->getPort()));
plan->setProperty(fetch_sftp, "Username", "nifiuser");
plan->setProperty(fetch_sftp, "Password", "nifipassword");
plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_NONE);
plan->setProperty(fetch_sftp, "Connection Timeout", "30 sec");
plan->setProperty(fetch_sftp, "Data Timeout", "30 sec");
plan->setProperty(fetch_sftp, "Strict Host Key Checking", "false");
plan->setProperty(fetch_sftp, "Send Keep Alive On Timeout", "true");
plan->setProperty(fetch_sftp, "Use Compression", "false");
// Configure PutFile processor
plan->setProperty(put_file, "Directory", std::string(dst_dir) + "/${path}");
plan->setProperty(put_file, "Conflict Resolution Strategy", processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL);
plan->setProperty(put_file, "Create Missing Directories", "true");
}
virtual ~FetchSFTPTestsFixture() {
free(src_dir);
free(dst_dir);
LogTestController::getInstance().reset();
}
// Create source file
void createFile(const std::string& relative_path, const std::string& content) {
std::fstream file;
std::stringstream ss;
ss << src_dir << "/vfs/" << relative_path;
utils::file::FileUtils::create_dir(utils::file::FileUtils::get_parent_path(ss.str())); // TODO
file.open(ss.str(), std::ios::out);
file << content;
file.close();
}
enum TestWhere {
IN_DESTINATION,
IN_SOURCE
};
void testFile(TestWhere where, const std::string& relative_path, const std::string& expected_content) {
std::stringstream resultFile;
if (where == IN_DESTINATION) {
resultFile << dst_dir << "/" << relative_path;
} else {
resultFile << src_dir << "/vfs/" << relative_path;
#ifndef WIN32
/* Workaround for mina-sshd setting the read file's permissions to 0000 */
REQUIRE(0 == chmod(resultFile.str().c_str(), 0644));
#endif
}
std::ifstream file(resultFile.str());
REQUIRE(true == file.good());
std::stringstream content;
std::vector<char> buffer(1024U);
while (file) {
file.read(buffer.data(), buffer.size());
content << std::string(buffer.data(), file.gcount());
}
REQUIRE(expected_content == content.str());
}
void testFileNotExists(TestWhere where, const std::string& relative_path) {
std::stringstream resultFile;
if (where == IN_DESTINATION) {
resultFile << dst_dir << "/" << relative_path;
} else {
resultFile << src_dir << "/vfs/" << relative_path;
#ifndef WIN32
/* Workaround for mina-sshd setting the read file's permissions to 0000 */
REQUIRE(-1 == chmod(resultFile.str().c_str(), 0644));
#endif
}
std::ifstream file(resultFile.str());
REQUIRE(false == file.is_open());
REQUIRE(false == file.good());
}
protected:
char *src_dir;
char *dst_dir;
std::unique_ptr<SFTPTestServer> sftp_server;
TestController testController;
std::shared_ptr<TestPlan> plan;
std::shared_ptr<core::Processor> generate_flow_file;
std::shared_ptr<core::Processor> update_attribute;
std::shared_ptr<core::Processor> fetch_sftp;
std::shared_ptr<core::Processor> put_file;
};
TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch one file", "[FetchSFTP][basic]") {
plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
createFile("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
}
TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP public key authentication", "[FetchSFTP][basic]") {
plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
plan->setProperty(fetch_sftp, "Private Key Path", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"));
plan->setProperty(fetch_sftp, "Private Key Passphrase", "privatekeypassword");
createFile("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with publickey"));
REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
}
TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch non-existing file", "[FetchSFTP][basic]") {
plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Failed to open remote file \"nifi_test/tstFile.ext\", error: LIBSSH2_FX_NO_SUCH_FILE"));
REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship not.found"));
}
#ifndef WIN32
TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch non-readable file", "[FetchSFTP][basic]") {
if (getuid() == 0) {
std::cerr << "!!!! This test does NOT work as root. Exiting. !!!!" << std::endl;
return;
}
plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
createFile("nifi_test/tstFile.ext", "Test content 1");
REQUIRE(0 == chmod((std::string(src_dir) + "/vfs/nifi_test/tstFile.ext").c_str(), 0000));
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Failed to open remote file \"nifi_test/tstFile.ext\", error: LIBSSH2_FX_PERMISSION_DENIED"));
REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship permission.denied"));
}
#endif
TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch connection error", "[FetchSFTP][basic]") {
plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
createFile("nifi_test/tstFile.ext", "Test content 1");
/* Run it once normally to open the connection */
testController.runSession(plan, true);
plan->reset();
/* Stop the server to create a connection error */
sftp_server.reset();
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Failed to open remote file \"nifi_test/tstFile.ext\" due to an underlying SSH error"));
REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship comms.failure"));
}
TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Delete File success", "[FetchSFTP][completion-strategy]") {
plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_DELETE_FILE);
createFile("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
testFileNotExists(IN_SOURCE, "nifi_test/tstFile.ext");
testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
}
#ifndef WIN32
TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Delete File fail", "[FetchSFTP][completion-strategy]") {
if (getuid() == 0) {
std::cerr << "!!!! This test does NOT work as root. Exiting. !!!!" << std::endl;
return;
}
plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_DELETE_FILE);
createFile("nifi_test/tstFile.ext", "Test content 1");
/* By making the parent directory non-writable we make it impossible do delete the source file */
REQUIRE(0 == chmod((std::string(src_dir) + "/vfs/nifi_test").c_str(), 0500));
testController.runSession(plan, true);
/* We should succeed even if the completion strategy fails */
testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
REQUIRE(LogTestController::getInstance().contains("Failed to remove remote file \"nifi_test/tstFile.ext\", error: LIBSSH2_FX_PERMISSION_DENIED"));
REQUIRE(LogTestController::getInstance().contains("Completion Strategy is Delete File, but failed to delete remote file \"nifi_test/tstFile.ext\""));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
}
#endif
TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Move File success", "[FetchSFTP][completion-strategy]") {
plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_MOVE_FILE);
plan->setProperty(fetch_sftp, "Move Destination Directory", "nifi_done/");
plan->setProperty(fetch_sftp, "Create Directory", "true");
createFile("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
testFileNotExists(IN_SOURCE, "nifi_test/tstFile.ext");
testFile(IN_SOURCE, "nifi_done/tstFile.ext", "Test content 1");
testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
}
TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Move File fail", "[FetchSFTP][completion-strategy]") {
plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_MOVE_FILE);
plan->setProperty(fetch_sftp, "Move Destination Directory", "nifi_done/");
/* The completion strategy should fail because the target directory does not exist and we don't create it */
plan->setProperty(fetch_sftp, "Create Directory", "false");
createFile("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
/* We should succeed even if the completion strategy fails */
testFileNotExists(IN_SOURCE, "nifi_done/tstFile.ext");
testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
REQUIRE(LogTestController::getInstance().contains("Failed to rename remote file \"nifi_test/tstFile.ext\" to \"nifi_done/tstFile.ext\", error: LIBSSH2_FX_NO_SUCH_FILE"));
REQUIRE(LogTestController::getInstance().contains("Completion Strategy is Move File, but failed to move file \"nifi_test/tstFile.ext\" to \"nifi_done/tstFile.ext\""));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
}
TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP expression language test", "[FetchSFTP]") {
plan->setProperty(update_attribute, "attr_Hostname", "localhost", true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Port", std::to_string(sftp_server->getPort()), true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Username", "nifiuser", true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Password", "nifipassword", true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Private Key Path",
utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"), true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Private Key Passphrase", "privatekeypassword", true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Remote File", "nifi_test/tstFile.ext", true /*dynamic*/);
plan->setProperty(update_attribute, "attr_Move Destination Directory", "nifi_done/", true /*dynamic*/);
plan->setProperty(fetch_sftp, "Hostname", "${'attr_Hostname'}");
plan->setProperty(fetch_sftp, "Port", "${'attr_Port'}");
plan->setProperty(fetch_sftp, "Username", "${'attr_Username'}");
plan->setProperty(fetch_sftp, "Password", "${'attr_Password'}");
plan->setProperty(fetch_sftp, "Private Key Path", "${'attr_Private Key Path'}");
plan->setProperty(fetch_sftp, "Private Key Passphrase", "${'attr_Private Key Passphrase'}");
plan->setProperty(fetch_sftp, "Remote File", "${'attr_Remote File'}");
plan->setProperty(fetch_sftp, "Move Destination Directory", "${'attr_Move Destination Directory'}");
plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_MOVE_FILE);
plan->setProperty(fetch_sftp, "Create Directory", "true");
createFile("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
testFileNotExists(IN_SOURCE, "nifi_test/tstFile.ext");
testFile(IN_SOURCE, "nifi_done/tstFile.ext", "Test content 1");
testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with publickey"));
REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
}