blob: 141a197656c79f97dc6264b5c5b3a521e3ec736b [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <utility>
#include <memory>
#include <string>
#include <vector>
#include <set>
#include <fstream>
#include "unit/ProvenanceTestHelper.h"
#include "TestBase.h"
#include "RandomServerSocket.h"
#include "Scheduling.h"
#include "LogAttribute.h"
#include "GetTCP.h"
#include "core/Core.h"
#include "core/FlowFile.h"
#include "core/Processor.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/ProcessorNode.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") {
utils::Identifier ident = utils::Identifier();
TestController testController;
std::vector<uint8_t> buffer;
for (auto c : "Hello World\nHello Warld\nGoodByte Cruel world") {
buffer.push_back(c);
}
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(std::make_shared<minifi::Configure>());
std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
org::apache::nifi::minifi::io::RandomServerSocket server(org::apache::nifi::minifi::io::Socket::getMyHostName());
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
LogTestController::getInstance().setDebug<minifi::processors::GetTCP>();
LogTestController::getInstance().setTrace<minifi::io::Socket>();
std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetTCP>("gettcpexample");
std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
processor->setStreamFactory(stream_factory);
processor->initialize();
utils::Identifier processoruuid = processor->getUUID();
REQUIRE(processoruuid);
utils::Identifier logattribute_uuid = logAttribute->getUUID();
REQUIRE(logattribute_uuid);
REQUIRE(processoruuid.to_string() != logattribute_uuid.to_string());
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "gettcpexampleConnection");
connection->addRelationship(core::Relationship("success", "description"));
std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
connection2->addRelationship(core::Relationship("success", "description"));
// link the connections so that we can test results at the end for this
connection->setSource(processor);
// link the connections so that we can test results at the end for this
connection->setDestination(logAttribute);
connection2->setSource(logAttribute);
connection2->setSourceUUID(logattribute_uuid);
connection->setSourceUUID(processoruuid);
connection->setDestinationUUID(logattribute_uuid);
processor->addConnection(connection);
logAttribute->addConnection(connection);
logAttribute->addConnection(connection2);
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(logAttribute);
std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList, org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" + std::to_string(server.getPort()));
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval, "100 msec");
auto session = std::make_shared<core::ProcessSession>(context);
auto session2 = std::make_shared<core::ProcessSession>(context2);
REQUIRE(processor->getName() == "gettcpexample");
std::shared_ptr<core::FlowFile> record;
processor->setScheduledState(core::ScheduledState::RUNNING);
std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
processor->onSchedule(context, factory);
processor->onTrigger(context, session);
server.write(buffer, buffer.size());
std::this_thread::sleep_for(std::chrono::seconds(2));
logAttribute->initialize();
logAttribute->incrementActiveTasks();
logAttribute->setScheduledState(core::ScheduledState::RUNNING);
std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
logAttribute->onSchedule(context2, factory2);
logAttribute->onTrigger(context2, session2);
auto reporter = session->getProvenanceReporter();
auto records = reporter->getEvents();
record = session->get();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
processor->incrementActiveTasks();
processor->setScheduledState(core::ScheduledState::RUNNING);
processor->onTrigger(context, session);
reporter = session->getProvenanceReporter();
records = reporter->getEvents();
session->commit();
logAttribute->incrementActiveTasks();
logAttribute->setScheduledState(core::ScheduledState::RUNNING);
logAttribute->onTrigger(context2, session2);
records = reporter->getEvents();
REQUIRE(true == LogTestController::getInstance().contains("Size:45 Offset:0"));
LogTestController::getInstance().reset();
}
TEST_CASE("GetTCPWithOEM", "[GetTCP2]") {
std::vector<uint8_t> buffer;
for (auto c : "Hello World\nHello Warld\nGoodByte Cruel world") {
buffer.push_back(c);
}
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(std::make_shared<minifi::Configure>());
std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
TestController testController;
org::apache::nifi::minifi::io::RandomServerSocket server(org::apache::nifi::minifi::io::Socket::getMyHostName());
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository >();
LogTestController::getInstance().setTrace<minifi::processors::GetTCP>();
LogTestController::getInstance().setTrace<core::ConfigurableComponent>();
LogTestController::getInstance().setTrace<minifi::io::Socket>();
std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetTCP>("gettcpexample");
std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
processor->setStreamFactory(stream_factory);
processor->initialize();
utils::Identifier processoruuid = processor->getUUID();
REQUIRE(processoruuid);
utils::Identifier logattribute_uuid = logAttribute->getUUID();
REQUIRE(logattribute_uuid);
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "gettcpexampleConnection");
connection->addRelationship(core::Relationship("partial", "description"));
std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
connection2->addRelationship(core::Relationship("partial", "description"));
// link the connections so that we can test results at the end for this
connection->setSource(processor);
// link the connections so that we can test results at the end for this
connection->setDestination(logAttribute);
connection2->setSource(logAttribute);
connection2->setSourceUUID(logattribute_uuid);
connection->setSourceUUID(processoruuid);
connection->setDestinationUUID(logattribute_uuid);
processor->addConnection(connection);
logAttribute->addConnection(connection);
logAttribute->addConnection(connection2);
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(logAttribute);
std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList, org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" + std::to_string(server.getPort()));
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval, "100 msec");
// we're using new lines above
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndOfMessageByte, "10");
auto session = std::make_shared<core::ProcessSession>(context);
auto session2 = std::make_shared<core::ProcessSession>(context2);
REQUIRE(processor->getName() == "gettcpexample");
std::shared_ptr<core::FlowFile> record;
processor->setScheduledState(core::ScheduledState::RUNNING);
std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
processor->onSchedule(context, factory);
processor->onTrigger(context, session);
server.write(buffer, buffer.size());
std::this_thread::sleep_for(std::chrono::seconds(2));
logAttribute->initialize();
logAttribute->incrementActiveTasks();
logAttribute->setScheduledState(core::ScheduledState::RUNNING);
std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
logAttribute->onSchedule(context2, factory2);
logAttribute->onTrigger(context2, session2);
auto reporter = session->getProvenanceReporter();
auto records = reporter->getEvents();
record = session->get();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
processor->incrementActiveTasks();
processor->setScheduledState(core::ScheduledState::RUNNING);
processor->onTrigger(context, session);
reporter = session->getProvenanceReporter();
records = reporter->getEvents();
session->commit();
logAttribute->incrementActiveTasks();
logAttribute->setScheduledState(core::ScheduledState::RUNNING);
logAttribute->onTrigger(context2, session2);
records = reporter->getEvents();
logAttribute->incrementActiveTasks();
logAttribute->setScheduledState(core::ScheduledState::RUNNING);
logAttribute->onTrigger(context2, session2);
records = reporter->getEvents();
REQUIRE(true == LogTestController::getInstance().contains("Size:11 Offset:0"));
REQUIRE(true == LogTestController::getInstance().contains("Size:12 Offset:0"));
REQUIRE(true == LogTestController::getInstance().contains("Size:22 Offset:0"));
LogTestController::getInstance().reset();
}
TEST_CASE("GetTCPWithOnlyOEM", "[GetTCP3]") {
std::vector<uint8_t> buffer;
for (auto c : "\n") {
buffer.push_back(c);
}
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(std::make_shared<minifi::Configure>());
std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
TestController testController;
LogTestController::getInstance().setDebug<minifi::io::Socket>();
org::apache::nifi::minifi::io::RandomServerSocket server(org::apache::nifi::minifi::io::Socket::getMyHostName());
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
LogTestController::getInstance().setDebug<minifi::processors::GetTCP>();
std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetTCP>("gettcpexample");
std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
processor->setStreamFactory(stream_factory);
processor->initialize();
utils::Identifier processoruuid = processor->getUUID();
REQUIRE(processoruuid);
utils::Identifier logattribute_uuid = logAttribute->getUUID();
REQUIRE(logattribute_uuid);
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "gettcpexampleConnection");
connection->addRelationship(core::Relationship("success", "description"));
std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute");
connection2->addRelationship(core::Relationship("success", "description"));
// link the connections so that we can test results at the end for this
connection->setSource(processor);
// link the connections so that we can test results at the end for this
connection->setDestination(logAttribute);
connection2->setSource(logAttribute);
connection2->setSourceUUID(logattribute_uuid);
connection->setSourceUUID(processoruuid);
connection->setDestinationUUID(logattribute_uuid);
processor->addConnection(connection);
logAttribute->addConnection(connection);
logAttribute->addConnection(connection2);
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(logAttribute);
std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList, org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" + std::to_string(server.getPort()));
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval, "100 msec");
// we're using new lines above
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndOfMessageByte, "10");
auto session = std::make_shared<core::ProcessSession>(context);
auto session2 = std::make_shared<core::ProcessSession>(context2);
REQUIRE(processor->getName() == "gettcpexample");
std::shared_ptr<core::FlowFile> record;
processor->setScheduledState(core::ScheduledState::RUNNING);
std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context);
processor->onSchedule(context, factory);
processor->onTrigger(context, session);
server.write(buffer, buffer.size());
std::this_thread::sleep_for(std::chrono::seconds(2));
logAttribute->initialize();
logAttribute->incrementActiveTasks();
logAttribute->setScheduledState(core::ScheduledState::RUNNING);
std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
logAttribute->onSchedule(context2, factory2);
logAttribute->onTrigger(context2, session2);
auto reporter = session->getProvenanceReporter();
auto records = reporter->getEvents();
record = session->get();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
processor->incrementActiveTasks();
processor->setScheduledState(core::ScheduledState::RUNNING);
processor->onTrigger(context, session);
reporter = session->getProvenanceReporter();
records = reporter->getEvents();
session->commit();
logAttribute->incrementActiveTasks();
logAttribute->setScheduledState(core::ScheduledState::RUNNING);
logAttribute->onTrigger(context2, session2);
records = reporter->getEvents();
logAttribute->incrementActiveTasks();
logAttribute->setScheduledState(core::ScheduledState::RUNNING);
logAttribute->onTrigger(context2, session2);
records = reporter->getEvents();
REQUIRE(true == LogTestController::getInstance().contains("Size:2 Offset:0"));
LogTestController::getInstance().reset();
}
TEST_CASE("GetTCPEmptyNoConnect", "[GetTCP3]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
LogTestController::getInstance().setDebug<minifi::processors::GetTCP>();
LogTestController::getInstance().setTrace<minifi::io::Socket>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetTCP", "gettcpexample");
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetTCP::EndpointList.getName(), org::apache::nifi::minifi::io::Socket::getMyHostName() + ":9182");
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval.getName(), "100 msec");
// we're using new lines above
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetTCP::EndOfMessageByte.getName(), "10");
testController.runSession(plan, false);
auto records = plan->getProvenanceRecords();
std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
REQUIRE(true == LogTestController::getInstance().contains("Could not create socket during initialization for " + org::apache::nifi::minifi::io::Socket::getMyHostName() + ":9182"));
LogTestController::getInstance().reset();
}