blob: 0282cb95d2d39f82f88a624d6e1111346901f7fb [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/stat.h>
#undef NDEBUG
#include <cassert>
#include <cstring>
#include <utility>
#include <chrono>
#include <fstream>
#include <memory>
#include <string>
#include <thread>
#include <type_traits>
#include <vector>
#include <iostream>
#include <sstream>
#include <algorithm>
#include <functional>
#include <iterator>
#include <random>
#include "TestBase.h"
#include "utils/StringUtils.h"
#include "utils/file/FileUtils.h"
#include "core/Core.h"
#include "core/logging/Logger.h"
#include "core/ProcessGroup.h"
#include "core/yaml/YamlConfiguration.h"
#include "FlowController.h"
#include "properties/Configure.h"
#include "unit/ProvenanceTestHelper.h"
#include "io/StreamFactory.h"
#include "processors/FetchSFTP.h"
#include "processors/ListSFTP.h"
#include "processors/GenerateFlowFile.h"
#include "processors/LogAttribute.h"
#include "processors/UpdateAttribute.h"
#include "processors/PutFile.h"
#include "tools/SFTPTestServer.h"
class ListThenFetchSFTPTestsFixture {
public:
ListThenFetchSFTPTestsFixture()
: src_dir(strdup("/tmp/sftps.XXXXXX"))
, dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::FlowController>();
LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
LogTestController::getInstance().setDebug<minifi::core::Processor>();
LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
LogTestController::getInstance().setTrace<processors::ListSFTP>();
LogTestController::getInstance().setTrace<processors::FetchSFTP>();
LogTestController::getInstance().setTrace<processors::PutFile>();
LogTestController::getInstance().setDebug<processors::LogAttribute>();
LogTestController::getInstance().setDebug<SFTPTestServer>();
// Create temporary directories
testController.createTempDirectory(src_dir);
REQUIRE(src_dir != nullptr);
testController.createTempDirectory(dst_dir);
REQUIRE(dst_dir != nullptr);
// Start SFTP server
sftp_server = std::unique_ptr<SFTPTestServer>(new SFTPTestServer(src_dir));
REQUIRE(true == sftp_server->start());
// Build MiNiFi processing graph
plan = testController.createPlan();
list_sftp = plan->addProcessor(
"ListSFTP",
"ListSFTP");
fetch_sftp = plan->addProcessor(
"FetchSFTP",
"FetchSFTP",
core::Relationship("success", "d"),
true);
log_attribute = plan->addProcessor("LogAttribute",
"LogAttribute",
{ core::Relationship("success", "d"),
core::Relationship("comms.failure", "d"),
core::Relationship("not.found", "d"),
core::Relationship("permission.denied", "d") },
true);
put_file = plan->addProcessor("PutFile",
"PutFile",
core::Relationship("success", "d"),
true);
// Configure ListSFTP processor
plan->setProperty(list_sftp, "Listing Strategy", processors::ListSFTP::LISTING_STRATEGY_TRACKING_TIMESTAMPS);
plan->setProperty(list_sftp, "Hostname", "localhost");
plan->setProperty(list_sftp, "Port", std::to_string(sftp_server->getPort()));
plan->setProperty(list_sftp, "Username", "nifiuser");
plan->setProperty(list_sftp, "Password", "nifipassword");
plan->setProperty(list_sftp, "Search Recursively", "false");
plan->setProperty(list_sftp, "Follow symlink", "false");
plan->setProperty(list_sftp, "Ignore Dotted Files", "false");
plan->setProperty(list_sftp, "Strict Host Key Checking", "false");
plan->setProperty(list_sftp, "Connection Timeout", "30 sec");
plan->setProperty(list_sftp, "Data Timeout", "30 sec");
plan->setProperty(list_sftp, "Send Keep Alive On Timeout", "true");
plan->setProperty(list_sftp, "Target System Timestamp Precision", processors::ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT);
plan->setProperty(list_sftp, "Minimum File Age", "0 sec");
plan->setProperty(list_sftp, "Minimum File Size", "0 B");
plan->setProperty(list_sftp, "Target System Timestamp Precision", "Seconds");
plan->setProperty(list_sftp, "Remote Path", "nifi_test/");
plan->setProperty(list_sftp, "State File", std::string(src_dir) + "/state");
// Configure FetchSFTP processor
plan->setProperty(fetch_sftp, "Hostname", "localhost");
plan->setProperty(fetch_sftp, "Port", std::to_string(sftp_server->getPort()));
plan->setProperty(fetch_sftp, "Username", "nifiuser");
plan->setProperty(fetch_sftp, "Password", "nifipassword");
plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_NONE);
plan->setProperty(fetch_sftp, "Connection Timeout", "30 sec");
plan->setProperty(fetch_sftp, "Data Timeout", "30 sec");
plan->setProperty(fetch_sftp, "Strict Host Key Checking", "false");
plan->setProperty(fetch_sftp, "Send Keep Alive On Timeout", "true");
plan->setProperty(fetch_sftp, "Use Compression", "false");
plan->setProperty(fetch_sftp, "Remote File", "${path}/${filename}");
// Configure LogAttribute processor
plan->setProperty(log_attribute, "FlowFiles To Log", "0");
// Configure PutFile processor
plan->setProperty(put_file, "Directory", std::string(dst_dir) + "/${path}");
plan->setProperty(put_file, "Conflict Resolution Strategy", processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL);
plan->setProperty(put_file, "Create Missing Directories", "true");
}
virtual ~ListThenFetchSFTPTestsFixture() {
free(src_dir);
free(dst_dir);
LogTestController::getInstance().reset();
}
// Create source file
void createFile(const std::string& relative_path, const std::string& content, uint64_t modification_timestamp = 0U) {
std::fstream file;
std::stringstream ss;
ss << src_dir << "/vfs/" << relative_path;
auto full_path = ss.str();
std::deque<std::string> parent_dirs;
std::string parent_dir = full_path;
while ((parent_dir = utils::file::FileUtils::get_parent_path(parent_dir)) != "") {
parent_dirs.push_front(parent_dir);
}
for (const auto& dir : parent_dirs) {
utils::file::FileUtils::create_dir(dir);
}
file.open(ss.str(), std::ios::out);
file << content;
file.close();
if (modification_timestamp != 0U) {
REQUIRE(true == utils::file::FileUtils::set_last_write_time(full_path, modification_timestamp));
}
}
void createFileWithModificationTimeDiff(const std::string& relative_path, const std::string& content, int64_t modification_timediff = -300 /*5 minutes ago*/) {
time_t now = time(nullptr);
return createFile(relative_path, content, now + modification_timediff);
}
enum TestWhere {
IN_DESTINATION,
IN_SOURCE
};
void testFile(TestWhere where, const std::string& relative_path, const std::string& expected_content) {
std::stringstream resultFile;
if (where == IN_DESTINATION) {
resultFile << dst_dir << "/" << relative_path;
} else {
resultFile << src_dir << "/vfs/" << relative_path;
#ifndef WIN32
/* Workaround for mina-sshd setting the read file's permissions to 0000 */
REQUIRE(0 == chmod(resultFile.str().c_str(), 0644));
#endif
}
std::ifstream file(resultFile.str());
REQUIRE(true == file.good());
std::stringstream content;
std::vector<char> buffer(1024U);
while (file) {
file.read(buffer.data(), buffer.size());
content << std::string(buffer.data(), file.gcount());
}
REQUIRE(expected_content == content.str());
}
void testFileNotExists(TestWhere where, const std::string& relative_path) {
std::stringstream resultFile;
if (where == IN_DESTINATION) {
resultFile << dst_dir << "/" << relative_path;
} else {
resultFile << src_dir << "/vfs/" << relative_path;
#ifndef WIN32
/* Workaround for mina-sshd setting the read file's permissions to 0000 */
REQUIRE(-1 == chmod(resultFile.str().c_str(), 0644));
#endif
}
std::ifstream file(resultFile.str());
REQUIRE(false == file.is_open());
REQUIRE(false == file.good());
}
protected:
char *src_dir;
char *dst_dir;
std::unique_ptr<SFTPTestServer> sftp_server;
TestController testController;
std::shared_ptr<TestPlan> plan;
std::shared_ptr<core::Processor> list_sftp;
std::shared_ptr<core::Processor> fetch_sftp;
std::shared_ptr<core::Processor> log_attribute;
std::shared_ptr<core::Processor> put_file;
};
TEST_CASE_METHOD(ListThenFetchSFTPTestsFixture, "ListSFTP then FetchSFTP one file", "[ListThenFetchSFTP][basic]") {
createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
testController.runSession(plan, true);
testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
}
TEST_CASE_METHOD(ListThenFetchSFTPTestsFixture, "ListSFTP then FetchSFTP two files", "[ListThenFetchSFTP][basic]") {
createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2");
/* ListSFTP */
plan->runNextProcessor();
/* FetchSFTP */
plan->runNextProcessor();
plan->runCurrentProcessor();
/* LogAttribute */
plan->runNextProcessor();
/* PutFile */
plan->runNextProcessor();
plan->runCurrentProcessor();
testFile(IN_SOURCE, "nifi_test/file1.ext", "Test content 1");
testFile(IN_DESTINATION, "nifi_test/file1.ext", "Test content 1");
testFile(IN_SOURCE, "nifi_test/file2.ext", "Test content 2");
testFile(IN_DESTINATION, "nifi_test/file2.ext", "Test content 2");
}