blob: 2643edc4f5d76708c44a1699f7eb96ee7ab30b20 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <string>
#include <unordered_set>
#include <unordered_map>
#include <filesystem>
#include "TestBase.h"
#include "Catch.h"
#include "core/Property.h"
#include "core/Processor.h"
#include "processors/FetchFile.h"
#include "utils/TestUtils.h"
#include "utils/IntegrationTestUtils.h"
#include "SingleProcessorTestController.h"
using namespace std::literals::chrono_literals;
namespace {
class FetchFileTestFixture {
public:
FetchFileTestFixture();
~FetchFileTestFixture();
protected:
std::unordered_multiset<std::string> getDirContents(const std::string& dir_path) const;
std::shared_ptr<minifi::processors::FetchFile> fetch_file_processor_;
std::shared_ptr<minifi::test::SingleProcessorTestController> test_controller_;
const std::string input_dir_;
const std::string permission_denied_file_name_;
const std::string input_file_name_;
const std::string file_content_;
std::unordered_map<std::string, std::string> attributes_;
};
FetchFileTestFixture::FetchFileTestFixture()
: fetch_file_processor_(std::make_shared<minifi::processors::FetchFile>("FetchFile")),
test_controller_(std::make_shared<minifi::test::SingleProcessorTestController>(fetch_file_processor_)),
input_dir_(test_controller_->createTempDirectory()),
permission_denied_file_name_("permission_denied.txt"),
input_file_name_("test.txt"),
file_content_("The quick brown fox jumps over the lazy dog\n") {
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setTrace<minifi::processors::FetchFile>();
attributes_ = {{"absolute.path", input_dir_}, {"filename", input_file_name_}};
utils::putFileToDir(input_dir_, input_file_name_, file_content_);
utils::putFileToDir(input_dir_, permission_denied_file_name_, file_content_);
std::filesystem::permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, static_cast<std::filesystem::perms>(0));
}
FetchFileTestFixture::~FetchFileTestFixture() {
std::filesystem::permissions(input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_, static_cast<std::filesystem::perms>(0644));
}
std::unordered_multiset<std::string> FetchFileTestFixture::getDirContents(const std::string& dir_path) const {
std::unordered_multiset<std::string> file_contents;
auto lambda = [&file_contents](const std::string& path, const std::string& filename) -> bool {
std::ifstream is(path + utils::file::FileUtils::get_separator() + filename, std::ifstream::binary);
file_contents.insert(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()));
return true;
};
utils::file::FileUtils::list_dir(dir_path, lambda, test_controller_->plan->getLogger(), false);
return file_contents;
}
TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default but non-existent file path", "[testFetchFile]") {
attributes_["filename"] = "non_existent.file";
const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::NotFound);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
REQUIRE(verifyLogLinePresenceInPollTime(1s, "[error] File to fetch was not found"));
}
TEST_CASE_METHOD(FetchFileTestFixture, "FileToFetch property set to a non-existent file path", "[testFetchFile]") {
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch, "/tmp/non_existent.file");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenFileNotFound, "INFO");
const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::NotFound);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
REQUIRE(verifyLogLinePresenceInPollTime(1s, "[info] File to fetch was not found"));
}
#ifndef WIN32
TEST_CASE_METHOD(FetchFileTestFixture, "Permission denied to read file", "[testFetchFile]") {
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch,
input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_);
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenPermissionDenied, "WARN");
const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::PermissionDenied);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
REQUIRE(verifyLogLinePresenceInPollTime(1s, "[warning] Read permission denied for file"));
}
#endif
TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default file path", "[testFetchFile]") {
const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
}
TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file from a custom path", "[testFetchFile]") {
REQUIRE(0 == utils::file::FileUtils::create_dir(input_dir_ + utils::file::FileUtils::get_separator() + "sub"));
utils::putFileToDir(input_dir_ + utils::file::FileUtils::get_separator() + "sub", input_file_name_, file_content_);
auto file_path = input_dir_ + utils::file::FileUtils::get_separator() + "sub" + utils::file::FileUtils::get_separator() + input_file_name_;
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch, file_path);
const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(utils::file::FileUtils::exists(file_path));
}
TEST_CASE_METHOD(FetchFileTestFixture, "Flow scheduling fails due to missing move destination directory when completion strategy is set to move file", "[testFetchFile]") {
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
REQUIRE_THROWS_AS(test_controller_->trigger("", attributes_), minifi::Exception);
}
TEST_CASE_METHOD(FetchFileTestFixture, "Flow fails due to move conflict", "[testFetchFile]") {
auto move_dir = test_controller_->createTempDirectory();
utils::putFileToDir(move_dir, input_file_name_, "old content");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Fail");
const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Failure);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
REQUIRE(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()) == "old content");
REQUIRE(utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
}
TEST_CASE_METHOD(FetchFileTestFixture, "Move specific properties are ignored when completion strategy is not move file", "[testFetchFile]") {
auto move_dir = test_controller_->createTempDirectory();
utils::putFileToDir(move_dir, input_file_name_, "old content");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Fail");
const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
}
TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved with replace file", "[testFetchFile]") {
auto move_dir = test_controller_->createTempDirectory();
utils::putFileToDir(move_dir, input_file_name_, "old content");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Replace File");
const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
REQUIRE(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()) == file_content_);
}
TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved with renaming file to a new random filename", "[testFetchFile]") {
auto move_dir = test_controller_->createTempDirectory();
utils::putFileToDir(move_dir, input_file_name_, "old content");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Rename");
const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
auto move_dir_contents = getDirContents(move_dir);
std::unordered_multiset<std::string> expected = {"old content", file_content_};
REQUIRE(move_dir_contents == expected);
}
TEST_CASE_METHOD(FetchFileTestFixture, "Move destination conflict is resolved with deleting the new file and keeping the old one", "[testFetchFile]") {
auto move_dir = test_controller_->createTempDirectory();
utils::putFileToDir(move_dir, input_file_name_, "old content");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Keep Existing");
const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
REQUIRE(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()) == "old content");
}
TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is moved to a new directory after flow completion", "[testFetchFile]") {
auto move_dir = test_controller_->createTempDirectory();
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
REQUIRE(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()) == file_content_);
}
TEST_CASE_METHOD(FetchFileTestFixture, "After flow completion the fetched file is moved to a non-existent directory which is created by the flow", "[testFetchFile]") {
auto move_dir = test_controller_->createTempDirectory();
move_dir = move_dir + utils::file::FileUtils::get_separator() + "temp";
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
std::ifstream is(move_dir + utils::file::FileUtils::get_separator() + input_file_name_, std::ifstream::binary);
REQUIRE(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()) == file_content_);
}
#ifndef WIN32
TEST_CASE_METHOD(FetchFileTestFixture, "Move completion strategy failure due to filesystem error still succeeds flow", "[testFetchFile]") {
auto move_dir = test_controller_->createTempDirectory();
utils::file::FileUtils::set_permissions(move_dir, 0);
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
REQUIRE(verifyLogLinePresenceInPollTime(1s, "completion strategy failed"));
utils::file::FileUtils::set_permissions(move_dir, 0644);
}
#endif
TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is deleted after flow completion", "[testFetchFile]") {
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Delete File");
const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
REQUIRE(!utils::file::FileUtils::exists(input_dir_ + utils::file::FileUtils::get_separator() + input_file_name_));
}
} // namespace