blob: 1ba607bfce054d4316a20f1abc73d627534f9ec6 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#undef NDEBUG
#include <chrono>
#include <map>
#include <memory>
#include <string>
#include <thread>
#include "core/Core.h"
#include "core/repository/AtomicRepoEntries.h"
#include "core/RepositoryFactory.h"
#include "FlowFileRecord.h"
#include "FlowFileRepository.h"
#include "properties/Configure.h"
#include "../unit/ProvenanceTestHelper.h"
#include "../TestBase.h"
#include "../../extensions/libarchive/MergeContent.h"
#include "../test/BufferReader.h"
#include "core/repository/VolatileFlowFileRepository.h"
using Connection = minifi::Connection;
using MergeContent = minifi::processors::MergeContent;
struct TestFlow{
TestFlow(const std::shared_ptr<core::Repository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo,
const std::function<std::shared_ptr<core::Processor>(utils::Identifier&)>& processorGenerator, const core::Relationship& relationshipToOutput)
: ff_repository(ff_repository), content_repo(content_repo), prov_repo(prov_repo) {
// setup processor
{
processor = processorGenerator(mainProcUUID());
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
processorContext = std::make_shared<core::ProcessContext>(node, nullptr, prov_repo, ff_repository, content_repo);
}
// setup INPUT processor
{
inputProcessor = std::make_shared<core::Processor>("source", inputProcUUID());
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(inputProcessor);
inputContext = std::make_shared<core::ProcessContext>(node, nullptr, prov_repo,
ff_repository, content_repo);
}
// setup Input Connection
{
input = std::make_shared<Connection>(ff_repository, content_repo, "Input", inputConnUUID());
input->setRelationship({"input", "d"});
input->setDestinationUUID(mainProcUUID());
input->setSourceUUID(inputProcUUID());
inputProcessor->addConnection(input);
}
// setup Output Connection
{
output = std::make_shared<Connection>(ff_repository, content_repo, "Output", outputConnUUID());
output->setRelationship(relationshipToOutput);
output->setSourceUUID(mainProcUUID());
}
// setup ProcessGroup
{
root = std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
root->addProcessor(processor);
root->addConnection(input);
root->addConnection(output);
}
// prepare Merge Processor for execution
processor->setScheduledState(core::ScheduledState::RUNNING);
processor->onSchedule(processorContext.get(), new core::ProcessSessionFactory(processorContext));
}
std::shared_ptr<core::FlowFile> write(const std::string& data) {
minifi::io::DataStream stream(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
core::ProcessSession sessionGenFlowFile(inputContext);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
sessionGenFlowFile.importFrom(stream, flow);
assert(flow->getResourceClaim()->getFlowFileRecordOwnedCount() == 1);
sessionGenFlowFile.transfer(flow, {"input", "d"});
sessionGenFlowFile.commit();
return flow;
}
std::string read(const std::shared_ptr<core::FlowFile>& file) {
core::ProcessSession session(processorContext);
std::vector<uint8_t> buffer;
BufferReader reader(buffer);
session.read(file, &reader);
return {buffer.data(), buffer.data() + buffer.size()};
}
void trigger() {
auto session = std::make_shared<core::ProcessSession>(processorContext);
processor->onTrigger(processorContext, session);
session->commit();
}
std::shared_ptr<Connection> input;
std::shared_ptr<Connection> output;
std::shared_ptr<core::ProcessGroup> root;
private:
static utils::Identifier& mainProcUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
static utils::Identifier& inputProcUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
static utils::Identifier& inputConnUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
static utils::Identifier& outputConnUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
std::shared_ptr<core::Processor> inputProcessor;
std::shared_ptr<core::Processor> processor;
std::shared_ptr<core::Repository> ff_repository;
std::shared_ptr<core::ContentRepository> content_repo;
std::shared_ptr<core::Repository> prov_repo;
std::shared_ptr<core::ProcessContext> inputContext;
std::shared_ptr<core::ProcessContext> processorContext;
};
std::shared_ptr<MergeContent> setupMergeProcessor(const utils::Identifier& id) {
auto processor = std::make_shared<MergeContent>("MergeContent", id);
processor->initialize();
processor->setAutoTerminatedRelationships({{"original", "d"}});
processor->setProperty(MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
processor->setProperty(MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
processor->setProperty(MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
processor->setProperty(MergeContent::MinEntries, "3");
processor->setProperty(MergeContent::Header, "_Header_");
processor->setProperty(MergeContent::Footer, "_Footer_");
processor->setProperty(MergeContent::Demarcator, "_Demarcator_");
processor->setProperty(MergeContent::MaxBinAge, "1 h");
return processor;
}
TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") {
TestController testController;
LogTestController::getInstance().setDebug<core::ContentRepository>();
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
char format[] = "/var/tmp/test.XXXXXX";
auto dir = testController.createTempDirectory(format);
auto config = std::make_shared<minifi::Configure>();
config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, utils::file::FileUtils::concat_path(dir, "content_repository"));
config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestRepository>();
std::shared_ptr<core::repository::FlowFileRepository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
ff_repository->initialize(config);
content_repo->initialize(config);
auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr, config, "")};
auto flowController = std::make_shared<minifi::FlowController>(prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", true);
{
TestFlow flow(ff_repository, content_repo, prov_repo, setupMergeProcessor, MergeContent::Merge);
flowController->load(flow.root);
ff_repository->start();
// write two files into the input
flow.write("one");
flow.write("two");
// capture them with the Merge Processor
flow.trigger();
flow.trigger();
ff_repository->stop();
flowController->unload();
// check if the processor has taken ownership
std::set<std::shared_ptr<core::FlowFile>> expired;
auto file = flow.input->poll(expired);
REQUIRE(!file);
REQUIRE(expired.empty());
file = flow.output->poll(expired);
REQUIRE(!file);
REQUIRE(expired.empty());
}
// swap the ProcessGroup and restart the FlowController
{
TestFlow flow(ff_repository, content_repo, prov_repo, setupMergeProcessor, MergeContent::Merge);
flowController->load(flow.root);
ff_repository->start();
// wait for FlowFileRepository to start and notify the owners of
// the resurrected FlowFiles
std::this_thread::sleep_for(std::chrono::milliseconds{100});
// write the third file into the input
flow.write("three");
flow.trigger();
ff_repository->stop();
flowController->unload();
std::set<std::shared_ptr<core::FlowFile>> expired;
auto file = flow.output->poll(expired);
REQUIRE(file);
REQUIRE(expired.empty());
auto content = flow.read(file);
auto isOneOfPossibleResults =
Catch::Equals("_Header_one_Demarcator_two_Demarcator_three_Footer_")
|| Catch::Equals("_Header_two_Demarcator_one_Demarcator_three_Footer_");
REQUIRE_THAT(content, isOneOfPossibleResults);
}
}
class ContentUpdaterProcessor : public core::Processor{
public:
ContentUpdaterProcessor(const std::string& name, utils::Identifier& id) : Processor(name, id) {}
void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override {
auto ff = session->get();
std::string data = "<override>";
minifi::io::DataStream stream(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
session->importFrom(stream, ff);
session->transfer(ff, {"success", "d"});
}
};
std::shared_ptr<core::Processor> setupContentUpdaterProcessor(utils::Identifier& id) {
return std::make_shared<ContentUpdaterProcessor>("Updater", id);
}
TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") {
TestController testController;
LogTestController::getInstance().setDebug<core::ContentRepository>();
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
LogTestController::getInstance().setTrace<core::repository::FlowFileRepository>();
LogTestController::getInstance().setTrace<core::repository::VolatileRepository<minifi::ResourceClaim::Path>>();
char format[] = "/var/tmp/test.XXXXXX";
auto dir = testController.createTempDirectory(format);
auto config = std::make_shared<minifi::Configure>();
config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, utils::file::FileUtils::concat_path(dir, "content_repository"));
config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestRepository>();
std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
std::shared_ptr<core::ContentRepository> content_repo;
SECTION("VolatileContentRepository") {
testController.getLogger()->log_info("Using VolatileContentRepository");
content_repo = std::make_shared<core::repository::VolatileContentRepository>();
}
SECTION("FileSystemContentRepository") {
testController.getLogger()->log_info("Using FileSystemRepository");
content_repo = std::make_shared<core::repository::FileSystemRepository>();
}
ff_repository->initialize(config);
content_repo->initialize(config);
auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr, config, "")};
auto flowController = std::make_shared<minifi::FlowController>(prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", true);
{
TestFlow flow(ff_repository, content_repo, prov_repo, setupContentUpdaterProcessor, {"success", "d"});
flowController->load(flow.root);
ff_repository->start();
std::string removedResource;
{
// write two files into the input
auto flowFile = flow.write("data");
auto claim = flowFile->getResourceClaim();
removedResource = claim->getContentFullPath();
// one from the FlowFile and one from the persisted instance
REQUIRE(claim->getFlowFileRecordOwnedCount() == 2);
// update them with the Merge Processor
flow.trigger();
auto content = flow.read(flowFile);
REQUIRE(content == "<override>");
auto newClaim = flowFile->getResourceClaim();
// the processor added new content to the flowFile
REQUIRE(claim != newClaim);
// only this instance behind this shared_ptr keeps the resource alive
REQUIRE(claim->getFlowFileRecordOwnedCount() == 1);
// one from the FlowFile and one from the persisted instance
REQUIRE(newClaim->getFlowFileRecordOwnedCount() == 2);
}
REQUIRE(LogTestController::getInstance().countOccurrences("Deleting resource " + removedResource) == 1);
REQUIRE(LogTestController::getInstance().countOccurrences("Deleting resource") == 1);
ff_repository->stop();
flowController->unload();
}
// swap the ProcessGroup and restart the FlowController
{
TestFlow flow(ff_repository, content_repo, prov_repo, setupContentUpdaterProcessor, {"success", "d"});
flowController->load(flow.root);
ff_repository->start();
// wait for FlowFileRepository to start and notify the owners of
// the resurrected FlowFiles
std::this_thread::sleep_for(std::chrono::milliseconds{100});
std::set<std::shared_ptr<core::FlowFile>> expired;
auto file = flow.output->poll(expired);
REQUIRE(file);
REQUIRE(expired.empty());
auto content = flow.read(file);
REQUIRE(content == "<override>");
// the still persisted instance and this FlowFile
REQUIRE(file->getResourceClaim()->getFlowFileRecordOwnedCount() == 2);
ff_repository->stop();
flowController->unload();
}
}