MINIFICPP-1159 - Fix unstable processor integration tests and clean up code duplication
Signed-off-by: Daniel Bakai <bakaid@apache.org>
This closes #736
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 03a2c73..4ab623b 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -34,7 +34,7 @@
class CoapIntegrationBase : public IntegrationBase {
public:
- CoapIntegrationBase(uint64_t waitTime = 60000)
+ CoapIntegrationBase(uint64_t waitTime = DEFAULT_WAITTIME_MSECS)
: IntegrationBase(waitTime),
server(nullptr) {
}
diff --git a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
index 378c143..0d2ec0c 100644
--- a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
@@ -58,7 +58,7 @@
dir = testController.createTempDirectory(format);
}
- void testSetup() {
+ void testSetup() override {
LogTestController::getInstance().setTrace<minifi::RemoteProcessorGroupPort>();
LogTestController::getInstance().setTrace<minifi::sitetosite::HttpSiteToSiteClient>();
LogTestController::getInstance().setTrace<minifi::sitetosite::SiteToSiteClient>();
@@ -78,15 +78,9 @@
configuration->set("nifi.remote.input.socket.port", "8099");
}
- virtual void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::seconds(3));
- }
+ void cleanup() override {}
- void cleanup() {
- }
-
- void runAssertions() {
- }
+ void runAssertions() override {}
protected:
bool isSecure;
diff --git a/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp b/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp
index 19d0f31..cb70147 100644
--- a/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp
+++ b/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp
@@ -58,7 +58,7 @@
dir = testController.createTempDirectory(format);
}
- void testSetup() {
+ void testSetup() override {
LogTestController::getInstance().setDebug<minifi::io::Socket>();
LogTestController::getInstance().setDebug<minifi::io::TLSContext>();
LogTestController::getInstance().setDebug<core::ProcessSession>();
@@ -71,15 +71,17 @@
file.close();
}
- void cleanup() {
+ void cleanup() override {
LogTestController::getInstance().reset();
}
- void runAssertions() {
+ void runAssertions() override {
+ isRunning_ = false;
+ server_socket_.reset();
assert(LogTestController::getInstance().contains("send succeed 20"));
}
- void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+ void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) override {
std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
assert(proc != nullptr);
@@ -107,8 +109,8 @@
if (host == "localhost") {
host = org::apache::nifi::minifi::io::Socket::getMyHostName();
}
- server_socket = std::make_shared<org::apache::nifi::minifi::io::TLSServerSocket>(socket_context, host, std::stoi(hostAndPort.at(1)), 3);
- server_socket->initialize();
+ server_socket_ = std::make_shared<org::apache::nifi::minifi::io::TLSServerSocket>(socket_context, host, std::stoi(hostAndPort.at(1)), 3);
+ server_socket_->initialize();
isRunning_ = true;
check = [this]() -> bool {
@@ -122,50 +124,7 @@
*size = 20;
return *size;
};
- server_socket->registerCallback(check, handler);
- }
-
- void run(std::string test_file_location) {
- testSetup();
-
- std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
- std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
-
- configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
-
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- content_repo->initialize(configuration);
- std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
- std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
- new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
-
- core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
-
- std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
- std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
-
- queryRootProcessGroup(pg);
-
- ptr.release();
-
- std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
-
- std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
- true);
- controller->load();
- controller->start();
- waitToVerifyProcessor();
- controller->waitUnload(60000);
- isRunning_ = false;
- server_socket->closeStream();
- server_socket = nullptr;
- runAssertions();
-
- cleanup();
- }
-
- virtual void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::seconds(3));
+ server_socket_->registerCallback(check, handler);
}
protected:
@@ -176,7 +135,7 @@
std::string dir;
std::stringstream ss;
TestController testController;
- std::shared_ptr<org::apache::nifi::minifi::io::TLSServerSocket> server_socket;
+ std::shared_ptr<org::apache::nifi::minifi::io::TLSServerSocket> server_socket_;
};
static void sigpipe_handle(int x) {
diff --git a/extensions/standard-processors/tests/integration/TailFileTest.cpp b/extensions/standard-processors/tests/integration/TailFileTest.cpp
index 3a460a9..7f1cb85 100644
--- a/extensions/standard-processors/tests/integration/TailFileTest.cpp
+++ b/extensions/standard-processors/tests/integration/TailFileTest.cpp
@@ -44,7 +44,7 @@
class TailFileTestHarness : public IntegrationBase {
public:
- TailFileTestHarness() {
+ TailFileTestHarness() : IntegrationBase(1000) {
char format[] = "/tmp/ssth.XXXXXX";
dir = testController.createTempDirectory(format);
@@ -57,25 +57,25 @@
file.close();
}
- void testSetup() {
+ void testSetup() override {
LogTestController::getInstance().setInfo<minifi::processors::LogAttribute>();
LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
LogTestController::getInstance().setTrace<minifi::FlowController>();
}
- virtual void cleanup() {
+ void cleanup() override {
unlink(ss.str().c_str());
unlink(statefile.c_str());
}
- virtual void runAssertions() {
+ void runAssertions() override {
assert(LogTestController::getInstance().contains("5 flowfiles were received from TailFile input") == true);
assert(LogTestController::getInstance().contains("Looking for delimiter 0xA") == true);
assert(LogTestController::getInstance().contains("li\\ne5") == true);
}
protected:
- virtual void updateProperties(std::shared_ptr<minifi::FlowController> fc) {
+ void updateProperties(std::shared_ptr<minifi::FlowController> fc) override {
for (auto &comp : fc->getComponents("tf")) {
std::shared_ptr<minifi::state::ProcessorController> proc = std::dynamic_pointer_cast<minifi::state::ProcessorController>(comp);
if (nullptr != proc) {
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index c247312..c51dd2c 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -18,6 +18,8 @@
#ifndef LIBMINIFI_TEST_INTEGRATION_INTEGRATIONBASE_H_
#define LIBMINIFI_TEST_INTEGRATION_INTEGRATIONBASE_H_
+#define DEFAULT_WAITTIME_MSECS 3000
+
#include "core/logging/Logger.h"
#include "core/ProcessGroup.h"
#include "core/yaml/YamlConfiguration.h"
@@ -31,7 +33,7 @@
class IntegrationBase {
public:
- IntegrationBase(uint64_t waitTime = 60000);
+ IntegrationBase(uint64_t waitTime = DEFAULT_WAITTIME_MSECS);
virtual ~IntegrationBase();
@@ -53,7 +55,7 @@
virtual void runAssertions() = 0;
virtual void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::seconds(3));
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_));
}
protected:
diff --git a/libminifi/test/resources/TestTailFile.yml b/libminifi/test/resources/TestTailFile.yml
index a1a78b9..e15a600 100644
--- a/libminifi/test/resources/TestTailFile.yml
+++ b/libminifi/test/resources/TestTailFile.yml
@@ -36,8 +36,7 @@
id: 2438e3c8-015a-1000-79ca-83af40ec1995
class: org.apache.nifi.processors.standard.LogAttribute
max concurrent tasks: 1
- scheduling strategy: TIMER_DRIVEN
- scheduling period: 500 msec
+ scheduling strategy: EVENT_DRIVEN
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
@@ -45,6 +44,7 @@
- success
Properties:
Log Payload: true
+ FlowFiles To Log: 0
Connections:
- name: tr1
diff --git a/libminifi/test/resources/TestTailFileCron.yml b/libminifi/test/resources/TestTailFileCron.yml
index dd30937..fdf63c0 100644
--- a/libminifi/test/resources/TestTailFileCron.yml
+++ b/libminifi/test/resources/TestTailFileCron.yml
@@ -36,8 +36,7 @@
id: 2438e3c8-015a-1000-79ca-83af40ec1995
class: org.apache.nifi.processors.standard.LogAttribute
max concurrent tasks: 1
- scheduling strategy: TIMER_DRIVEN
- scheduling period: 500 msec
+ scheduling strategy: EVENT_DRIVEN
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
@@ -45,6 +44,7 @@
- success
Properties:
Log Payload: true
+ FlowFiles To Log: 0
Connections:
- name: tr1