blob: 4fed83d115b182e98748456fc0d64dd843560171 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <array>
#include <chrono>
#include <memory>
#include <string>
#include <thread>
#include "core/Core.h"
#include "core/repository/AtomicRepoEntries.h"
#include "core/RepositoryFactory.h"
#include "FlowFileRecord.h"
#include "FlowFileRepository.h"
#include "properties/Configure.h"
#include "unit/ProvenanceTestHelper.h"
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "catch2/matchers/catch_matchers_string.hpp"
#include "../../extensions/libarchive/MergeContent.h"
#include "core/repository/VolatileFlowFileRepository.h"
#include "../../extensions/rocksdb-repos/DatabaseContentRepository.h"
#include "unit/TestUtils.h"
#include "core/repository/FileSystemRepository.h"
#include "core/ProcessorImpl.h"
#include "Connection.h"
#include "core/ProcessContext.h"
#include "core/ProcessSessionFactory.h"
using ConnectionImpl = minifi::ConnectionImpl;
using Connection = minifi::Connection;
using MergeContent = minifi::processors::MergeContent;
using minifi::test::utils::verifyEventHappenedInPollTime;
namespace {
class TestProcessor : public minifi::core::ProcessorImpl {
public:
using ProcessorImpl::ProcessorImpl;
static constexpr bool SupportsDynamicProperties = false;
static constexpr bool SupportsDynamicRelationships = false;
static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
};
struct TestFlow{
TestFlow(const std::shared_ptr<core::Repository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo,
const std::function<std::unique_ptr<core::Processor>(utils::Identifier&)>& processorGenerator, const core::Relationship& relationshipToOutput)
: ff_repository(ff_repository), content_repo(content_repo), prov_repo(prov_repo) {
// setup processor
auto processor = processorGenerator(mainProcUUID());
{
processor_ = processor.get();
processorContext = std::make_shared<core::ProcessContextImpl>(*processor_, nullptr, prov_repo, ff_repository, content_repo);
}
// setup INPUT processor
{
inputProcessor = minifi::test::utils::make_processor<TestProcessor>("source", inputProcUUID());
inputContext = std::make_shared<core::ProcessContextImpl>(*inputProcessor, nullptr, prov_repo,
ff_repository, content_repo);
}
// setup Input Connection
auto input = std::make_unique<ConnectionImpl>(ff_repository, content_repo, "Input", inputConnUUID());
{
input_ = input.get();
input->addRelationship({"input", "d"});
input->setDestinationUUID(mainProcUUID());
input->setSourceUUID(inputProcUUID());
inputProcessor->addConnection(input.get());
}
// setup Output Connection
auto output = std::make_unique<ConnectionImpl>(ff_repository, content_repo, "Output", outputConnUUID());
{
output_ = output.get();
output->addRelationship(relationshipToOutput);
output->setSourceUUID(mainProcUUID());
}
// setup ProcessGroup
{
root_ = std::make_unique<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
root_->addProcessor(std::move(processor));
root_->addConnection(std::move(input));
root_->addConnection(std::move(output));
}
// prepare Merge Processor for execution
processor_->setScheduledState(core::ScheduledState::RUNNING);
process_session_factory_ = std::make_unique<core::ProcessSessionFactoryImpl>(processorContext);
processor_->onSchedule(*processorContext, *process_session_factory_);
}
std::shared_ptr<core::FlowFile> write(const std::string& data) {
minifi::io::BufferStream stream(data);
core::ProcessSessionImpl sessionGenFlowFile(inputContext);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
sessionGenFlowFile.importFrom(stream, flow);
REQUIRE(flow->getResourceClaim()->getFlowFileRecordOwnedCount() == 1);
sessionGenFlowFile.transfer(flow, {"input", "d"});
sessionGenFlowFile.commit();
return flow;
}
std::string read(const std::shared_ptr<core::FlowFile>& file) {
return to_string(core::ProcessSessionImpl{processorContext}.readBuffer(file));
}
void trigger() {
auto session = std::make_shared<core::ProcessSessionImpl>(processorContext);
processor_->onTrigger(*processorContext, *session);
session->commit();
}
Connection* input_;
Connection* output_;
std::unique_ptr<core::ProcessGroup> root_;
private:
static utils::Identifier& mainProcUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
static utils::Identifier& inputProcUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
static utils::Identifier& inputConnUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
static utils::Identifier& outputConnUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
std::shared_ptr<core::Processor> inputProcessor;
core::Processor* processor_;
std::shared_ptr<core::Repository> ff_repository;
std::shared_ptr<core::ContentRepository> content_repo;
std::shared_ptr<core::Repository> prov_repo;
std::shared_ptr<core::ProcessContext> inputContext;
std::shared_ptr<core::ProcessContext> processorContext;
std::unique_ptr<core::ProcessSessionFactory> process_session_factory_;
};
std::unique_ptr<core::Processor> setupMergeProcessor(const utils::Identifier& id) {
auto processor = minifi::test::utils::make_processor<MergeContent>("MergeContent", id);
processor->initialize();
processor->setAutoTerminatedRelationships(std::array{core::Relationship{"original", "d"}});
REQUIRE(processor->setProperty(MergeContent::MergeFormat.name, std::string{org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE}));
REQUIRE(processor->setProperty(MergeContent::MergeStrategy.name, std::string{org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK}));
REQUIRE(processor->setProperty(MergeContent::DelimiterStrategy.name, std::string{org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT}));
REQUIRE(processor->setProperty(MergeContent::MinEntries.name, "3"));
REQUIRE(processor->setProperty(MergeContent::Header.name, "_Header_"));
REQUIRE(processor->setProperty(MergeContent::Footer.name, "_Footer_"));
REQUIRE(processor->setProperty(MergeContent::Demarcator.name, "_Demarcator_"));
REQUIRE(processor->setProperty(MergeContent::MaxBinAge.name, "1 h"));
return processor;
}
TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") {
TestController testController;
LogTestController::getInstance().setDebug<core::ContentRepository>();
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
LogTestController::getInstance().setTrace<core::repository::FlowFileRepository>();
auto dir = testController.createTempDirectory();
auto config = std::make_shared<minifi::ConfigureImpl>();
config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, (dir / "content_repository").string());
config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>();
auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
ff_repository->initialize(config);
content_repo->initialize(config);
auto flowConfig = std::make_unique<core::FlowConfiguration>(core::ConfigurationContext{
.flow_file_repo = ff_repository,
.content_repo = content_repo,
.configuration = config,
.path = "",
.filesystem = std::make_shared<utils::file::FileSystem>(),
.sensitive_values_encryptor = utils::crypto::EncryptionProvider{utils::crypto::XSalsa20Cipher{utils::crypto::XSalsa20Cipher::generateKey()}}
});
auto flowController = std::make_shared<minifi::FlowController>(prov_repo, ff_repository, config, std::move(flowConfig), content_repo);
{
TestFlow flow(ff_repository, content_repo, prov_repo, setupMergeProcessor, MergeContent::Merge);
flowController->load(std::move(flow.root_));
ff_repository->start();
REQUIRE(verifyEventHappenedInPollTime(std::chrono::seconds(1), [&ff_repository]{ return ff_repository->isRunning(); }));
// write two files into the input
flow.write("one");
flow.write("two");
// capture them with the Merge Processor
flow.trigger();
flow.trigger();
ff_repository->stop();
flowController->stop();
// check if the processor has taken ownership
std::set<std::shared_ptr<core::FlowFile>> expired;
auto file = flow.input_->poll(expired);
REQUIRE(!file);
REQUIRE(expired.empty());
file = flow.output_->poll(expired);
REQUIRE(!file);
REQUIRE(expired.empty());
}
// swap the ProcessGroup and restart the FlowController
{
TestFlow flow(ff_repository, content_repo, prov_repo, setupMergeProcessor, MergeContent::Merge);
flowController->load(std::move(flow.root_));
ff_repository->start();
REQUIRE(verifyEventHappenedInPollTime(std::chrono::seconds(1), [&ff_repository]{ return ff_repository->isRunning(); }));
REQUIRE(verifyEventHappenedInPollTime(std::chrono::seconds(1), []{ return LogTestController::getInstance().countOccurrences("Found connection for") == 2; }));
// write the third file into the input
flow.write("three");
flow.trigger();
ff_repository->stop();
flowController->stop();
std::shared_ptr<org::apache::nifi::minifi::core::FlowFile> file = nullptr;
std::set<std::shared_ptr<core::FlowFile>> expired;
const auto flowFileArrivedInOutput = [&file, &expired, &flow] {
file = flow.output_->poll(expired);
return file != nullptr;
};
REQUIRE(verifyEventHappenedInPollTime(std::chrono::seconds(1), flowFileArrivedInOutput, std::chrono::milliseconds(50)));
REQUIRE(expired.empty());
auto content = flow.read(file);
// See important note about matchers at: https://github.com/catchorg/Catch2/blob/e8cdfdca87ebacd993befdd08ea6aa7e8068ef3d/docs/matchers.md#using-matchers
REQUIRE_THAT(content, Catch::Matchers::Equals("_Header_one_Demarcator_two_Demarcator_three_Footer_") || Catch::Matchers::Equals("_Header_two_Demarcator_one_Demarcator_three_Footer_"));
}
}
class ContentUpdaterProcessor : public core::ProcessorImpl {
public:
using ProcessorImpl::ProcessorImpl;
static constexpr bool SupportsDynamicProperties = false;
static constexpr bool SupportsDynamicRelationships = false;
static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
void onTrigger(core::ProcessContext&, core::ProcessSession& session) override {
auto ff = session.get();
std::string data = "<override>";
minifi::io::BufferStream stream(data);
session.importFrom(stream, ff);
session.transfer(ff, {"success", "d"});
}
};
std::unique_ptr<core::Processor> setupContentUpdaterProcessor(const utils::Identifier& id) {
return minifi::test::utils::make_processor<ContentUpdaterProcessor>("Updater", id);
}
TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") {
TestController testController;
LogTestController::getInstance().setDebug<core::ContentRepository>();
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
LogTestController::getInstance().setTrace<core::repository::FlowFileRepository>();
LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>();
auto dir = testController.createTempDirectory();
auto config = std::make_shared<minifi::ConfigureImpl>();
config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, (dir / "content_repository").string());
config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
config->set(minifi::Configure::nifi_dbcontent_repository_purge_period, "0 s");
std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>();
std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
std::shared_ptr<core::ContentRepository> content_repo;
SECTION("VolatileContentRepository") {
testController.getLogger()->log_info("Using VolatileContentRepository");
content_repo = std::make_shared<core::repository::VolatileContentRepository>();
}
SECTION("FileSystemContentRepository") {
testController.getLogger()->log_info("Using FileSystemRepository");
content_repo = std::make_shared<core::repository::FileSystemRepository>();
}
SECTION("DatabaseContentRepository") {
testController.getLogger()->log_info("Using DatabaseContentRepository");
content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
}
ff_repository->initialize(config);
content_repo->initialize(config);
auto flowConfig = std::make_unique<core::FlowConfiguration>(core::ConfigurationContext{
.flow_file_repo = ff_repository,
.content_repo = content_repo,
.configuration = config,
.path = "",
.filesystem = std::make_shared<utils::file::FileSystem>(),
.sensitive_values_encryptor = utils::crypto::EncryptionProvider{utils::crypto::XSalsa20Cipher{utils::crypto::XSalsa20Cipher::generateKey()}}
});
auto flowController = std::make_shared<minifi::FlowController>(prov_repo, ff_repository, config, std::move(flowConfig), content_repo);
{
TestFlow flow(ff_repository, content_repo, prov_repo, setupContentUpdaterProcessor, {"success", "d"});
flowController->load(std::move(flow.root_));
ff_repository->start();
REQUIRE(verifyEventHappenedInPollTime(std::chrono::seconds(1), [&ff_repository]{ return ff_repository->isRunning(); }));
std::string removedResource;
{
// write two files into the input
auto flowFile = flow.write("data");
auto claim = flowFile->getResourceClaim();
removedResource = claim->getContentFullPath();
// one from the FlowFile and one from the persisted instance
REQUIRE(claim->getFlowFileRecordOwnedCount() == 2);
// update them with the Merge Processor
flow.trigger();
auto content = flow.read(flowFile);
REQUIRE(content == "<override>");
auto newClaim = flowFile->getResourceClaim();
// the processor added new content to the flowFile
REQUIRE(claim != newClaim);
// only this instance behind this shared_ptr keeps the resource alive
REQUIRE(claim.use_count() == 1);
REQUIRE(claim->getFlowFileRecordOwnedCount() == 1);
// one from the FlowFile and one from the persisted instance
REQUIRE(newClaim->getFlowFileRecordOwnedCount() == 2);
}
REQUIRE(LogTestController::getInstance().countOccurrences("Deleting resource " + removedResource) == 1);
REQUIRE(LogTestController::getInstance().countOccurrences("Deleting resource") == 1);
ff_repository->stop();
flowController->stop();
}
// swap the ProcessGroup and restart the FlowController
{
TestFlow flow(ff_repository, content_repo, prov_repo, setupContentUpdaterProcessor, {"success", "d"});
flowController->load(std::move(flow.root_));
ff_repository->start();
REQUIRE(verifyEventHappenedInPollTime(std::chrono::seconds(1), [&ff_repository]{ return ff_repository->isRunning(); }));
std::set<std::shared_ptr<core::FlowFile>> expired;
std::shared_ptr<org::apache::nifi::minifi::core::FlowFile> file = nullptr;
const auto flowFileArrivedInOutput = [&file, &expired, &flow] {
file = flow.output_->poll(expired);
return file != nullptr;
};
REQUIRE(verifyEventHappenedInPollTime(std::chrono::seconds(1), flowFileArrivedInOutput, std::chrono::milliseconds(50)));
REQUIRE(expired.empty());
auto content = flow.read(file);
REQUIRE(content == "<override>");
// the still persisted instance and this FlowFile
REQUIRE(file->getResourceClaim()->getFlowFileRecordOwnedCount() == 2);
ff_repository->stop();
flowController->stop();
}
}
} // namespace