blob: 4024e67e24b079fd9198b0f7002fac94844821d3 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <utility>
#include <memory>
#include <string>
#include <vector>
#include <set>
#include <fstream>
#include "TestBase.h"
#include "LogAttribute.h"
#include "GetFile.h"
#include "unit/ProvenanceTestHelper.h"
#include "core/Core.h"
#include "core/FlowFile.h"
#include "core/Processor.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/ProcessorNode.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
TestController testController;
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("processorname");
REQUIRE(processor->getName() == "processorname");
}
TEST_CASE("Test GetFileMultiple", "[getfileCreate3]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
processor->initialize();
std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
utils::Identifier processoruuid;
REQUIRE(true == processor->getUUID(processoruuid));
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, content_repo, "getfileCreate2Connection");
connection->addRelationship(core::Relationship("success", "description"));
// link the connections so that we can test results at the end for this
connection->setSource(processor);
connection->setDestination(processor);
connection->setSourceUUID(processoruuid);
connection->setDestinationUUID(processoruuid);
processor->addConnection(connection);
REQUIRE(!dir.empty());
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir);
// replicate 10 threads
processor->setScheduledState(core::ScheduledState::RUNNING);
auto factory = std::make_shared<core::ProcessSessionFactory>(context);
processor->onSchedule(context, factory);
int prev = 0;
for (int i = 0; i < 10; i++) {
auto session = std::make_shared<core::ProcessSession>(context);
REQUIRE(processor->getName() == "getfileCreate2");
std::shared_ptr<core::FlowFile> record;
processor->onTrigger(context, session);
auto reporter = session->getProvenanceReporter();
auto records = reporter->getEvents();
record = session->get();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
std::fstream file;
std::stringstream ss;
ss << dir << utils::file::FileUtils::get_separator() << "tstFile.ext";
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
processor->incrementActiveTasks();
processor->setScheduledState(core::ScheduledState::RUNNING);
processor->onTrigger(context, session);
unlink(ss.str().c_str());
reporter = session->getProvenanceReporter();
REQUIRE(processor->getName() == "getfileCreate2");
records = reporter->getEvents();
for (auto provEventRecord : records) {
REQUIRE(provEventRecord->getComponentType() == processor->getName());
}
session->commit();
std::shared_ptr<core::FlowFile> ffr = session->get();
REQUIRE(repo->getRepoMap().size() == (prev + 1));
prev++;
}
}
TEST_CASE("Test GetFile Ignore", "[getfileCreate3]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
utils::Identifier processoruuid;
REQUIRE(true == processor->getUUID(processoruuid));
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, content_repo, "getfileCreate2Connection");
connection->addRelationship(core::Relationship("success", "description"));
// link the connections so that we can test results at the end for this
connection->setSource(processor);
connection->setDestination(processor);
connection->setSourceUUID(processoruuid);
connection->setDestinationUUID(processoruuid);
processor->addConnection(connection);
REQUIRE(!dir.empty());
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir);
// replicate 10 threads
processor->setScheduledState(core::ScheduledState::RUNNING);
auto factory = std::make_shared<core::ProcessSessionFactory>(context);
processor->onSchedule(context, factory);
int prev = 0;
auto session = std::make_shared<core::ProcessSession>(context);
REQUIRE(processor->getName() == "getfileCreate2");
std::shared_ptr<core::FlowFile> record;
processor->onTrigger(context, session);
auto reporter = session->getProvenanceReporter();
auto records = reporter->getEvents();
record = session->get();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
std::fstream file;
std::stringstream ss;
ss << dir << utils::file::FileUtils::get_separator() << ".filewithoutanext";
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
processor->incrementActiveTasks();
processor->setScheduledState(core::ScheduledState::RUNNING);
processor->onTrigger(context, session);
unlink(ss.str().c_str());
reporter = session->getProvenanceReporter();
REQUIRE(processor->getName() == "getfileCreate2");
records = reporter->getEvents();
for (auto provEventRecord : records) {
REQUIRE(provEventRecord->getComponentType() == processor->getName());
}
session->commit();
std::shared_ptr<core::FlowFile> ffr = session->get();
REQUIRE(repo->getRepoMap().size() == 0);
prev++;
}
TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
testController.runSession(plan, false);
auto records = plan->getProvenanceRecords();
std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
std::fstream file;
std::stringstream ss;
ss << dir << utils::file::FileUtils::get_separator() << "tstFile.ext";
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
plan->reset();
testController.runSession(plan, false);
unlink(ss.str().c_str());
records = plan->getProvenanceRecords();
record = plan->getCurrentFlowFile();
testController.runSession(plan, false);
records = plan->getProvenanceRecords();
record = plan->getCurrentFlowFile();
REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str()));
REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0"));
REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir)));
LogTestController::getInstance().reset();
}
TEST_CASE("LogAttributeTestInvalid", "[TestLogAttribute]") {
TestController testController;
LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>();
LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
auto loggattr = plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::BatchSize.getName(), "1");
REQUIRE_THROWS_AS(plan->setProperty(loggattr, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(), "-1"), std::out_of_range);
LogTestController::getInstance().reset();
}
void testMultiplesLogAttribute(int fileCount, int flowsToLog, std::string verifyStringFlowsLogged = "") {
TestController testController;
LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>();
LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
auto loggattr = plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
auto flowsToLogStr = std::to_string(flowsToLog);
if (verifyStringFlowsLogged.empty())
verifyStringFlowsLogged = std::to_string(flowsToLog);
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::BatchSize.getName(), std::to_string(fileCount));
plan->setProperty(loggattr, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(), flowsToLogStr);
testController.runSession(plan, false);
auto records = plan->getProvenanceRecords();
std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
std::vector<std::string> files;
for (int i = 0; i < fileCount; i++) {
std::fstream file;
std::stringstream ss;
ss << dir << utils::file::FileUtils::get_separator() << "tstFile" << i << ".ext";
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
files.push_back(ss.str());
}
plan->reset();
testController.runSession(plan, false);
for (const auto &created_file : files) {
unlink(created_file.c_str());
}
records = plan->getProvenanceRecords();
record = plan->getCurrentFlowFile();
testController.runSession(plan, false);
records = plan->getProvenanceRecords();
record = plan->getCurrentFlowFile();
REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0"));
REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir)));
REQUIRE(true == LogTestController::getInstance().contains("Logged " + verifyStringFlowsLogged + " flow files"));
LogTestController::getInstance().reset();
}
TEST_CASE("LogAttributeTestSingle", "[TestLogAttribute]") {
testMultiplesLogAttribute(1, 3, "1");
}
TEST_CASE("LogAttributeTestMultiple", "[TestLogAttribute]") {
testMultiplesLogAttribute(5, 3);
}
TEST_CASE("LogAttributeTestAll", "[TestLogAttribute]") {
testMultiplesLogAttribute(5, 0, "5");
}
TEST_CASE("Test Find file", "[getfileCreate3]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::provenance::ProvenanceReporter>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> processor = plan->addProcessor("GetFile", "getfileCreate2");
std::shared_ptr<core::Processor> processorReport = std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(
minifi::io::StreamFactory::getInstance(std::make_shared<org::apache::nifi::minifi::Configure>()), std::make_shared<org::apache::nifi::minifi::Configure>());
plan->addProcessor(processorReport, "reporter", core::Relationship("success", "description"), false);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
plan->setProperty(processor, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
testController.runSession(plan, false);
auto records = plan->getProvenanceRecords();
std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
std::fstream file;
std::stringstream ss;
ss << dir << utils::file::FileUtils::get_separator() << "tstFile.ext";
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
plan->reset();
testController.runSession(plan, false);
records = plan->getProvenanceRecords();
record = plan->getCurrentFlowFile();
for (auto provEventRecord : records) {
REQUIRE(provEventRecord->getComponentType() == processor->getName());
}
std::shared_ptr<core::FlowFile> ffr = plan->getCurrentFlowFile();
REQUIRE(ffr != nullptr);
ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
auto repo = std::static_pointer_cast<TestRepository>(plan->getProvenanceRepo());
REQUIRE(2 == repo->getRepoMap().size());
for (auto entry : repo->getRepoMap()) {
provenance::ProvenanceEventRecord newRecord;
newRecord.DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(entry.second.data())), entry.second.length());
bool found = false;
for (auto provRec : records) {
if (provRec->getEventId() == newRecord.getEventId()) {
REQUIRE(provRec->getEventId() == newRecord.getEventId());
REQUIRE(provRec->getComponentId() == newRecord.getComponentId());
REQUIRE(provRec->getComponentType() == newRecord.getComponentType());
REQUIRE(provRec->getDetails() == newRecord.getDetails());
REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration());
found = true;
break;
}
}
if (!found) {
throw std::runtime_error("Did not find record");
}
}
std::shared_ptr<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask> taskReport = std::static_pointer_cast<
org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(processorReport);
taskReport->setBatchSize(1);
std::vector<std::shared_ptr<core::SerializableComponent>> recordsReport;
recordsReport.push_back(std::make_shared<provenance::ProvenanceEventRecord>());
processorReport->incrementActiveTasks();
processorReport->setScheduledState(core::ScheduledState::RUNNING);
std::string jsonStr;
std::size_t deserialized = 0;
repo->DeSerialize(recordsReport, deserialized);
std::function<void(const std::shared_ptr<core::ProcessContext> &, const std::shared_ptr<core::ProcessSession>&)> verifyReporter =
[&](const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
taskReport->getJsonReport(context, session, recordsReport, jsonStr);
REQUIRE(recordsReport.size() == 1);
REQUIRE(taskReport->getName() == std::string(org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName));
REQUIRE(jsonStr.find("\"componentType\": \"getfileCreate2\"") != std::string::npos);
};
testController.runSession(plan, false, verifyReporter);
}
class TestProcessorNoContent : public minifi::core::Processor {
public:
explicit TestProcessorNoContent(std::string name, utils::Identifier uuid = NULL)
: Processor(name, uuid),
Success("success", "All files are routed to success") {
}
// Destructor
virtual ~TestProcessorNoContent() {
}
core::Relationship Success;
/**
* Function that's executed when the processor is scheduled.
* @param context process context.
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
}
/**
* Execution trigger for the GetFile Processor
* @param context processor context
* @param session processor session reference.
*/
virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
auto ff = session->create();
ff->addAttribute("Attribute", "AttributeValue");
session->transfer(ff, Success);
}
};
REGISTER_RESOURCE(TestProcessorNoContent, "test resource");
TEST_CASE("TestEmptyContent", "[emptyContent]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
LogTestController::getInstance().setDebug<TestPlan>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> getfile = plan->addProcessor("TestProcessorNoContent", "TestProcessorNoContent");
plan->runNextProcessor();
// segfault
LogTestController::getInstance().reset();
}
/**
* Tests the RPG bypass feature
* @param host to configure
* @param port port string to configure
* @param portVal port value to search in the corresponding log message
* @param hasException dictates if a failure should occur
*/
void testRPGBypass(const std::string &host, const std::string &port, const std::string &portVal = "-1", bool hasException = true) {
TestController testController;
LogTestController::getInstance().setTrace<minifi::RemoteProcessorGroupPort>();
LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
LogTestController::getInstance().setTrace<TestPlan>();
auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
auto factory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
auto rpg = std::make_shared<minifi::RemoteProcessorGroupPort>(factory, "rpg", "http://localhost:8989/nifi", configuration);
rpg->initialize();
REQUIRE(rpg->setProperty(minifi::RemoteProcessorGroupPort::hostName, host));
rpg->setProperty(minifi::RemoteProcessorGroupPort::port, port);
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(rpg);
std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
auto psf = std::make_shared<core::ProcessSessionFactory>(context);
if (hasException) {
auto expected_error = "Site2Site Protocol:HTTPClient not resolvable. No peers configured or any port specific hostname and port -- cannot schedule";
try {
rpg->onSchedule(context, psf);
} catch (std::exception &e) {
REQUIRE(expected_error == std::string(e.what()));
}
std::stringstream search_expr;
search_expr << " " << host << "/" << port << "/" << portVal << " -- configuration values after eval of configuration options";
REQUIRE(LogTestController::getInstance().contains(search_expr.str()));
}
LogTestController::getInstance().reset();
}
/**
* Since there is no curl loaded in this test folder, we will have is_http_disabled be true.
*/
TEST_CASE("TestRPGNoSettings", "[TestRPG1]") {
testRPGBypass("", "");
}
TEST_CASE("TestRPGWithHost", "[TestRPG2]") {
testRPGBypass("hostname", "");
}
TEST_CASE("TestRPGWithHostInvalidPort", "[TestRPG3]") {
testRPGBypass("hostname", "hostname");
}
TEST_CASE("TestRPGWithoutHostValidPort", "[TestRPG4]") {
testRPGBypass("", "8080");
}
TEST_CASE("TestRPGWithoutHostInvalidPort", "[TestRPG5]") {
testRPGBypass("", "hostname");
}
TEST_CASE("TestRPGValid", "[TestRPG6]") {
testRPGBypass("", "8080", "8080", false);
}
int fileSize(const char *add) {
std::ifstream mySource;
mySource.open(add, std::ios_base::binary);
mySource.seekg(0, std::ios_base::end);
int size = mySource.tellg();
mySource.close();
return size;
}