blob: 99b643286d8482a4616e39c2116d7b92b936ca9c [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// loading extensions increases the baseline memory usage
// as we measure the absolute memory usage that would fail this test
#define EXTENSION_LIST "" // NOLINT(cppcoreguidelines-macro-usage)
#include <list>
#include "utils/gsl.h"
#include "utils/OsUtils.h"
#include "unit/TestUtils.h"
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "utils/Literals.h"
#include "core/repository/FileSystemRepository.h"
#include "utils/file/FileUtils.h"
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::test {
class TestFileSystemRepository : public minifi::core::repository::FileSystemRepository {
public:
using FileSystemRepository::FileSystemRepository;
std::list<std::string> getPurgeList() const {
return purge_list_;
}
};
TEST_CASE("Test Physical memory usage", "[testphysicalmemoryusage]") {
TestController controller;
auto dir = controller.createTempDirectory();
auto fs_repo = std::make_shared<minifi::core::repository::FileSystemRepository>();
auto config = std::make_shared<minifi::ConfigureImpl>();
config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
REQUIRE(fs_repo->initialize(config));
const auto start_memory = minifi::utils::OsUtils::getCurrentProcessPhysicalMemoryUsage();
REQUIRE(start_memory > 0);
auto content_session = fs_repo->createSession();
auto resource_id = content_session->create();
auto stream = content_session->write(resource_id);
size_t file_size = 20_MB;
std::span<const char> fragment = "well, hello there";
for (size_t i = 0; i < file_size / fragment.size() + 1; ++i) {
stream->write(as_bytes(fragment));
}
using org::apache::nifi::minifi::test::utils::verifyEventHappenedInPollTime;
CHECK(verifyEventHappenedInPollTime(5s, [&] {
const auto end_memory = minifi::utils::OsUtils::getCurrentProcessPhysicalMemoryUsage();
REQUIRE(end_memory > 0);
return end_memory < start_memory + int64_t{5_MB};
}, 100ms));
}
TEST_CASE("FileSystemRepository can clear orphan entries") {
TestController testController;
auto dir = testController.createTempDirectory();
auto configuration = std::make_shared<org::apache::nifi::minifi::ConfigureImpl>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
{
auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
REQUIRE(content_repo->initialize(configuration));
minifi::ResourceClaimImpl claim(content_repo);
content_repo->write(claim)->write("hi");
// ensure that the content is not deleted during resource claim destruction
content_repo->incrementStreamCount(claim);
}
REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).size() == 1);
auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
REQUIRE(content_repo->initialize(configuration));
content_repo->clearOrphans();
REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).empty());
}
TEST_CASE("FileSystemRepository can retry removing entry that previously failed to be removed") {
if (utils::runningAsUnixRoot())
SKIP("Cannot test insufficient permissions with root user");
TestController testController;
auto dir = testController.createTempDirectory();
auto configuration = std::make_shared<org::apache::nifi::minifi::ConfigureImpl>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
auto content_repo = std::make_shared<TestFileSystemRepository>();
REQUIRE(content_repo->initialize(configuration));
{
minifi::ResourceClaimImpl claim(content_repo);
content_repo->write(claim)->write("hi");
auto files = minifi::utils::file::list_dir_all(dir, testController.getLogger());
REQUIRE(files.size() == 1);
// ensure that the content is not deleted during resource claim destruction
utils::makeFileOrDirectoryNotWritable(dir);
}
utils::makeFileOrDirectoryWritable(dir);
REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).size() == 1);
{
minifi::ResourceClaimImpl claim(content_repo);
content_repo->write(claim)->write("hi");
REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).size() == 2);
}
REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).empty());
REQUIRE(content_repo->getPurgeList().empty());
}
TEST_CASE("FileSystemRepository removes non-existing resource file from purge list") {
if (utils::runningAsUnixRoot())
SKIP("Cannot test insufficient permissions with root user");
TestController testController;
auto dir = testController.createTempDirectory();
auto configuration = std::make_shared<org::apache::nifi::minifi::ConfigureImpl>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
auto content_repo = std::make_shared<TestFileSystemRepository>();
REQUIRE(content_repo->initialize(configuration));
std::string filename;
{
minifi::ResourceClaimImpl claim(content_repo);
content_repo->write(claim)->write("hi");
auto files = minifi::utils::file::list_dir_all(dir, testController.getLogger());
REQUIRE(files.size() == 1);
// ensure that the content is not deleted during resource claim destruction
filename = (files[0].first / files[0].second).string();
utils::makeFileOrDirectoryNotWritable(dir);
}
utils::makeFileOrDirectoryWritable(dir);
REQUIRE(std::filesystem::remove(filename));
REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).empty());
{
minifi::ResourceClaimImpl claim(content_repo);
content_repo->write(claim)->write("hi");
REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).size() == 1);
}
REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).empty());
REQUIRE(content_repo->getPurgeList().empty());
}
TEST_CASE("Append Claim") {
TestController testController;
auto dir = testController.createTempDirectory();
auto content_repo = std::make_shared<TestFileSystemRepository>();
auto configuration = std::make_shared<org::apache::nifi::minifi::ConfigureImpl>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
REQUIRE(content_repo->initialize(configuration));
const std::string content = "well hello there";
auto claim = std::make_shared<minifi::ResourceClaimImpl>(content_repo);
content_repo->write(*claim)->write(as_bytes(std::span(content)));
// requesting append before content end fails
CHECK(content_repo->lockAppend(*claim, 0) == nullptr);
auto lock = content_repo->lockAppend(*claim, content.length());
// trying to append to the end succeeds
CHECK(lock != nullptr);
// simultaneously trying to append to the same claim fails
CHECK(content_repo->lockAppend(*claim, content.length()) == nullptr);
// manually deleting append lock
lock.reset();
// appending after lock is released succeeds
lock = content_repo->lockAppend(*claim, content.length());
CHECK(lock != nullptr);
const std::string appended = "General Kenobi!";
content_repo->write(*claim, true)->write(as_bytes(std::span(appended)));
lock.reset();
// size has changed
CHECK(content_repo->lockAppend(*claim, content.length()) == nullptr);
CHECK(content_repo->lockAppend(*claim, content.length() + appended.length()) != nullptr);
}
} // namespace org::apache::nifi::minifi::test