MINIFICPP-1281 - Use event polling instead of sleep by sync for simple multithreaded tests
MINIFICPP-1281 - Further improve test performance by changing the integration tests to poll for events while waiting
MINIFICPP-1281 - Decrease sleep and load in MinifiConcurrentQueueTests
MINIFICPP-1281 - Tweak integration test heartbeat periods
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #835
diff --git a/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp b/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
index e87819a..0425583 100644
--- a/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
+++ b/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
@@ -53,6 +53,7 @@
#include "CoapServer.h"
#include "io/BaseStream.h"
#include "concurrentqueue.h"
+#include "utils/IntegrationTestUtils.h"
class VerifyCoAPServer : public CoapIntegrationBase {
public:
@@ -82,12 +83,12 @@
}
void runAssertions() {
-
- std::this_thread::sleep_for(std::chrono::milliseconds(3000));
- assert(LogTestController::getInstance().contains("Received ack. version 3. number of operations 1") == true);
- assert(LogTestController::getInstance().contains("Received ack. version 3. number of operations 0") == true);
- assert(LogTestController::getInstance().contains("Received error event from protocol") == true);
- assert(LogTestController::getInstance().contains("Received op 1, with id id and operand operand") == true);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(3),
+ "Received ack. version 3. number of operations 1",
+ "Received ack. version 3. number of operations 0",
+ "Received error event from protocol",
+ "Received op 1, with id id and operand operand"));
}
void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index ceefc6d..1c4d264 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -51,6 +51,7 @@
std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+ configuration->set("c2.agent.heartbeat.period", "200");
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
@@ -74,12 +75,12 @@
controller->load();
controller->start();
- waitToVerifyProcessor();
+
+ runAssertions();
shutdownBeforeFlowController();
controller->waitUnload(wait_time_);
controller->stopC2();
- runAssertions();
cleanup();
}
diff --git a/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp b/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp
index 40da9e2..975aa45 100644
--- a/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp
+++ b/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp
@@ -33,6 +33,7 @@
#include "integration/IntegrationBase.h"
#include "ProcessContextExpr.h"
#include "TestBase.h"
+#include "utils/IntegrationTestUtils.h"
class TestHarness : public IntegrationBase {
public:
@@ -46,9 +47,11 @@
void cleanup() override {}
void runAssertions() override {
- assert(LogTestController::getInstance().contains("key:route_check_attr value:good"));
- assert(LogTestController::getInstance().contains("key:variable_attribute value:replacement_value"));
- assert(LogTestController::getInstance().contains("ProcessSession rollback", std::chrono::seconds(1)) == false); // No rollback happened
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+ "key:route_check_attr value:good",
+ "key:variable_attribute value:replacement_value"));
+ assert(false == verifyLogLinePresenceInPollTime(std::chrono::milliseconds(200), "ProcessSession rollback")); // No rollback happened
}
void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) override {
diff --git a/extensions/http-curl/tests/C2FailedUpdateTest.cpp b/extensions/http-curl/tests/C2FailedUpdateTest.cpp
index cc18b3f..599a901 100644
--- a/extensions/http-curl/tests/C2FailedUpdateTest.cpp
+++ b/extensions/http-curl/tests/C2FailedUpdateTest.cpp
@@ -19,6 +19,7 @@
#undef NDEBUG
#include "HTTPIntegrationBase.h"
#include "HTTPHandlers.h"
+#include "utils/IntegrationTestUtils.h"
int main(int argc, char **argv) {
const cmd_args args = parse_cmdline_args(argc, argv, "update");
diff --git a/extensions/http-curl/tests/C2JstackTest.cpp b/extensions/http-curl/tests/C2JstackTest.cpp
index 77a315f..3386fbb 100644
--- a/extensions/http-curl/tests/C2JstackTest.cpp
+++ b/extensions/http-curl/tests/C2JstackTest.cpp
@@ -21,11 +21,13 @@
#include "TestBase.h"
#include "HTTPIntegrationBase.h"
#include "HTTPHandlers.h"
+#include "utils/IntegrationTestUtils.h"
class VerifyC2DescribeJstack : public VerifyC2Describe {
public:
void runAssertions() override {
- assert(LogTestController::getInstance().contains("SchedulingAgent"));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_), "SchedulingAgent"));
}
};
diff --git a/extensions/http-curl/tests/C2NullConfiguration.cpp b/extensions/http-curl/tests/C2NullConfiguration.cpp
index 07ebeae..34db113 100644
--- a/extensions/http-curl/tests/C2NullConfiguration.cpp
+++ b/extensions/http-curl/tests/C2NullConfiguration.cpp
@@ -30,6 +30,7 @@
#include "c2/C2Agent.h"
#include "processors/LogAttribute.h"
#include "HTTPIntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
class VerifyC2Server : public HTTPIntegrationBase {
public:
@@ -57,8 +58,10 @@
}
void runAssertions() override {
- assert(LogTestController::getInstance().contains("C2Agent] [debug] Could not instantiate null"));
- assert(LogTestController::getInstance().contains("Class is RESTSender"));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+ "C2Agent] [debug] Could not instantiate null",
+ "Class is RESTSender"));
}
void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) override {
diff --git a/extensions/http-curl/tests/C2UpdateAgentTest.cpp b/extensions/http-curl/tests/C2UpdateAgentTest.cpp
index 53ce3d7..3ab4409 100644
--- a/extensions/http-curl/tests/C2UpdateAgentTest.cpp
+++ b/extensions/http-curl/tests/C2UpdateAgentTest.cpp
@@ -34,6 +34,6 @@
const auto then = std::chrono::system_clock::now();
const auto seconds = std::chrono::duration_cast<std::chrono::seconds>(then - start).count();
- assert(handler.calls_ <= (seconds) + 1);
+ assert(handler.calls_ <= (seconds) + 2);
return 0;
}
diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp b/extensions/http-curl/tests/C2UpdateTest.cpp
index 0475525..2c85a79 100644
--- a/extensions/http-curl/tests/C2UpdateTest.cpp
+++ b/extensions/http-curl/tests/C2UpdateTest.cpp
@@ -19,6 +19,7 @@
#undef NDEBUG
#include "HTTPIntegrationBase.h"
#include "HTTPHandlers.h"
+#include "utils/IntegrationTestUtils.h"
int main(int argc, char **argv) {
const cmd_args args = parse_cmdline_args(argc, argv, "update");
diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
index 65c2821..40ccbd7 100644
--- a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
@@ -24,6 +24,7 @@
#include "protocols/RESTReceiver.h"
#include "HTTPIntegrationBase.h"
#include "HTTPHandlers.h"
+#include "utils/IntegrationTestUtils.h"
class LightWeightC2Handler : public HeartbeatHandler {
public:
@@ -51,9 +52,11 @@
}
void runAssertions() override {
- assert(LogTestController::getInstance().contains("Received Ack from Server"));
- assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke"));
- assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController"));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+ "Received Ack from Server",
+ "C2Agent] [debug] Stopping component invoke",
+ "C2Agent] [debug] Stopping component FlowController"));
}
void configureFullHeartbeat() override {
@@ -62,7 +65,7 @@
};
class VerifyLightWeightC2Heartbeat : public VerifyC2Heartbeat {
-public:
+ public:
void configureFullHeartbeat() override {
configuration->set("nifi.c2.full.heartbeat", "false");
}
diff --git a/extensions/http-curl/tests/C2VerifyServeResults.cpp b/extensions/http-curl/tests/C2VerifyServeResults.cpp
index 2f9f36a..e814e7c 100644
--- a/extensions/http-curl/tests/C2VerifyServeResults.cpp
+++ b/extensions/http-curl/tests/C2VerifyServeResults.cpp
@@ -26,6 +26,7 @@
#include "properties/Configure.h"
#include "TestServer.h"
#include "HTTPIntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
class VerifyC2Server : public HTTPIntegrationBase {
public:
@@ -49,8 +50,10 @@
}
void runAssertions() override {
- assert(LogTestController::getInstance().contains("Import offset 0"));
- assert(LogTestController::getInstance().contains("Outputting success and response"));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+ "Import offset 0",
+ "Outputting success and response"));
}
void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) override {
diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
index 0917f50..1ada032 100644
--- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
+++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
@@ -36,6 +36,7 @@
#include "unit/MockClasses.h"
#include "unit/ProvenanceTestHelper.h"
#include "integration/IntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
REGISTER_RESOURCE(MockControllerService, "");
REGISTER_RESOURCE(MockProcessor, "");
@@ -45,6 +46,7 @@
}
int main(int argc, char **argv) {
+ using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
const cmd_args args = parse_cmdline_args(argc, argv);
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
@@ -106,35 +108,27 @@
assert(ssl_client->getCACertificate().length() > 0);
// now let's disable one of the controller services.
std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller->getControllerServiceNode("ID");
+ const auto checkCsIdEnabledMatchesDisabledFlag = [&cs_id] { return !disabled == cs_id->enabled(); };
assert(cs_id != nullptr);
{
std::lock_guard<std::mutex> lock(control_mutex);
- controller->disableControllerService(cs_id);
- disabled = true;
- waitToVerifyProcessor();
- }
- {
- std::lock_guard<std::mutex> lock(control_mutex);
controller->enableControllerService(cs_id);
disabled = false;
- waitToVerifyProcessor();
}
std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995");
- assert(cs_id->enabled());
-{
+ assert(verifyEventHappenedInPollTime(std::chrono::seconds(4), checkCsIdEnabledMatchesDisabledFlag));
+ {
std::lock_guard<std::mutex> lock(control_mutex);
controller->disableReferencingServices(mock_cont);
disabled = true;
- waitToVerifyProcessor();
}
- assert(cs_id->enabled() == false);
-{
+ assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
+ {
std::lock_guard<std::mutex> lock(control_mutex);
controller->enableReferencingServices(mock_cont);
disabled = false;
- waitToVerifyProcessor();
}
- assert(cs_id->enabled() == true);
+ assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
controller->waitUnload(60000);
return 0;
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index c8bbb69..9e1940a 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -24,6 +24,7 @@
#include "protocols/RESTSender.h"
#include "ServerAwareHandler.h"
#include "TestBase.h"
+#include "utils/IntegrationTestUtils.h"
#include "TestServer.h"
int log_message(const struct mg_connection *conn, const char *message) {
@@ -130,6 +131,9 @@
}
void runAssertions() override {
+ // This class is never used for running assertions, but we are forced to wait for DescribeManifestHandler to verifyJsonHasAgentManifest
+ // if we were to log something on finished verification, we could poll on finding it
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_));
}
};
@@ -158,7 +162,8 @@
}
void runAssertions() override {
- assert(LogTestController::getInstance().contains("Starting to reload Flow Controller with flow control name MiNiFi Flow, version"));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(10), "Starting to reload Flow Controller with flow control name MiNiFi Flow, version"));
}
};
@@ -179,8 +184,8 @@
}
void runAssertions() override {
- assert(LogTestController::getInstance().contains("removing file"));
- assert(LogTestController::getInstance().contains("May not have command processor"));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(10), "removing file", "May not have command processor"));
}
};
@@ -197,8 +202,8 @@
}
void runAssertions() override {
- assert(LogTestController::getInstance().contains("Invalid configuration payload"));
- assert(LogTestController::getInstance().contains("update failed"));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(10), "Invalid configuration payload", "update failed."));
}
void cleanup() override {
diff --git a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
index febef4f..f504c90 100644
--- a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
@@ -65,7 +65,10 @@
void cleanup() override {}
- void runAssertions() override {}
+ void runAssertions() override {
+ // There is nothing to verify here, but we are expected to wait for all paralell events to execute
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_));
+ }
protected:
bool isSecure;
diff --git a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
index 0efffb5..b2dd31a 100644
--- a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
@@ -36,12 +36,17 @@
#include "unit/ProvenanceTestHelper.h"
#include "io/StreamFactory.h"
#include "processors/LogAttribute.h"
-#include "HTTPIntegrationBase.h"
+#include "integration/IntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
-namespace {
- void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::seconds(10));
- }
+int log_message(const struct mg_connection *conn, const char *message) {
+ puts(message);
+ return 1;
+}
+
+int ssl_enable(void* /*ssl_context*/, void* /*user_data*/) {
+ puts("Enable ssl");
+ return 0;
}
class HttpResponder : public CivetHandler {
@@ -59,6 +64,7 @@
};
int main(int argc, char **argv) {
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
const cmd_args args = parse_cmdline_args(argc, argv);
LogTestController::getInstance().setDebug<core::Processor>();
@@ -119,19 +125,19 @@
}
controller->load();
controller->start();
- waitToVerifyProcessor();
+
+ assert(verifyLogLinePresenceInPollTime(
+ std::chrono::seconds(10),
+ "key:filename value:",
+ "key:invokehttp.request.url value:" + url,
+ "key:invokehttp.status.code value:200",
+ "key:flow.id"));
controller->waitUnload(60000);
if (url.find("localhost") == std::string::npos) {
server.reset();
exit(1);
}
- std::string logs = LogTestController::getInstance().log_output.str();
-
- assert(logs.find("key:filename value:") != std::string::npos);
- assert(logs.find("key:invokehttp.request.url value:" + url) != std::string::npos);
- assert(logs.find("key:invokehttp.status.code value:200") != std::string::npos);
- assert(logs.find("key:flow.id") != std::string::npos);
LogTestController::getInstance().reset();
return 0;
diff --git a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
index 71c2957..b36f811 100644
--- a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
@@ -28,6 +28,7 @@
#include "core/ProcessGroup.h"
#include "FlowController.h"
#include "HTTPIntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
class HttpTestHarness : public HTTPIntegrationBase {
public:
@@ -65,9 +66,11 @@
}
void runAssertions() override {
- assert(LogTestController::getInstance().contains("curl performed"));
- assert(LogTestController::getInstance().contains("Size:1024 Offset:0"));
- assert(!LogTestController::getInstance().contains("Size:0 Offset:0"));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+ "curl performed",
+ "Size:1024 Offset:0"));
+ assert(false == verifyLogLinePresenceInPollTime(std::chrono::milliseconds(200), "Size:0 Offset:0"));
}
protected:
diff --git a/extensions/http-curl/tests/SiteToSiteRestTest.cpp b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
index 2c58b9b..cfa09d7 100644
--- a/extensions/http-curl/tests/SiteToSiteRestTest.cpp
+++ b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
@@ -30,6 +30,7 @@
#include "core/ConfigurableComponent.h"
#include "controllers/SSLContextService.h"
#include "HTTPIntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
class Responder : public ServerAwareHandler {
public:
@@ -86,12 +87,13 @@
}
void runAssertions() override {
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
if (isSecure) {
- assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 1"));
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_), "process group remote site2site port 10001, is secure 1"));
} else {
- assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 0"));
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_), "process group remote site2site port 10001, is secure 0"));
}
- assert(LogTestController::getInstance().contains("ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed "));
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_), "ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed "));
}
protected:
diff --git a/extensions/http-curl/tests/ThreadPoolAdjust.cpp b/extensions/http-curl/tests/ThreadPoolAdjust.cpp
index c06966e..fffbf10 100644
--- a/extensions/http-curl/tests/ThreadPoolAdjust.cpp
+++ b/extensions/http-curl/tests/ThreadPoolAdjust.cpp
@@ -28,6 +28,7 @@
#include "FlowController.h"
#include "HTTPIntegrationBase.h"
#include "processors/LogAttribute.h"
+#include "utils/IntegrationTestUtils.h"
class HttpTestHarness : public IntegrationBase {
public:
@@ -64,9 +65,11 @@
}
void runAssertions() override {
- assert(LogTestController::getInstance().contains("curl performed"));
- assert(LogTestController::getInstance().contains("Size:1024 Offset:0"));
- assert(!LogTestController::getInstance().contains("Size:0 Offset:0"));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+ "curl performed",
+ "Size:1024 Offset:0"));
+ assert(false == verifyLogLinePresenceInPollTime(std::chrono::milliseconds(200), "Size:0 Offset:0"));
}
protected:
diff --git a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
index f723793..99e5a01 100644
--- a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
@@ -66,7 +66,10 @@
void cleanup() override {}
- void runAssertions() override {}
+ void runAssertions() override {
+ // There is nothing to verify here, but we are expected to wait for all paralell events to execute
+ std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_));
+ }
protected:
bool isSecure;
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
index 74e5324..a0bf58b 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
+++ b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
@@ -26,9 +26,10 @@
#include "CivetServer.h"
#include "HTTPIntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
class VerifyInvokeHTTP : public HTTPIntegrationBase {
-public:
+ public:
VerifyInvokeHTTP()
: HTTPIntegrationBase(6000) {
}
@@ -55,7 +56,7 @@
std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
configuration->set(minifi::Configure::nifi_flow_configuration_file, flow_yml_path);
-
+ configuration->set("c2.agent.heartbeat.period", "200");
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
@@ -79,7 +80,8 @@
setupFlow(flow_yml_path);
startFlowController();
- waitToVerifyProcessor();
+ runAssertions();
+
shutdownBeforeFlowController();
stopFlowController();
}
@@ -92,47 +94,55 @@
flowController_->unload();
flowController_->stopC2();
- runAssertions();
cleanup();
}
};
class VerifyInvokeHTTPOKResponse : public VerifyInvokeHTTP {
-public:
+ public:
void runAssertions() override {
- assert(LogTestController::getInstance().contains("key:invokehttp.status.code value:201"));
- assert(LogTestController::getInstance().contains("response code 201"));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(6),
+ "key:invokehttp.status.code value:201",
+ "response code 201"));
}
};
class VerifyCouldNotConnectInvokeHTTP : public VerifyInvokeHTTP {
-public:
+ public:
void runAssertions() override {
- assert(LogTestController::getInstance().contains("key:invoke_http value:failure"));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(6), "key:invoke_http value:failure"));
}
};
class VerifyNoRetryInvokeHTTP : public VerifyInvokeHTTP {
-public:
+ public:
void runAssertions() override {
- assert(LogTestController::getInstance().contains("key:invokehttp.status.message value:HTTP/1.1 404 Not Found"));
- assert(LogTestController::getInstance().contains("isSuccess: 0, response code 404"));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(6),
+ "key:invokehttp.status.message value:HTTP/1.1 404 Not Found",
+ "isSuccess: 0, response code 404"));
}
};
class VerifyRetryInvokeHTTP : public VerifyInvokeHTTP {
-public:
+ public:
void runAssertions() override {
- assert(LogTestController::getInstance().contains("key:invokehttp.status.message value:HTTP/1.1 501 Not Implemented"));
- assert(LogTestController::getInstance().contains("isSuccess: 0, response code 501"));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(6),
+ "key:invokehttp.status.message value:HTTP/1.1 501 Not Implemented",
+ "isSuccess: 0, response code 501"));
}
};
class VerifyRWTimeoutInvokeHTTP : public VerifyInvokeHTTP {
-public:
+ public:
void runAssertions() override {
- assert(LogTestController::getInstance().contains("key:invoke_http value:failure"));
- assert(LogTestController::getInstance().contains("limit (1000ms) reached, terminating connection"));
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(6),
+ "key:invoke_http value:failure",
+ "limit (1000ms) reached, terminating connection"));
}
};
@@ -160,7 +170,7 @@
harness.setupFlow(args.test_file);
harness.shutdownBeforeFlowController();
harness.startFlowController();
- harness.waitToVerifyProcessor();
+ harness.runAssertions();
harness.stopFlowController();
}
@@ -183,7 +193,7 @@
}
{
- TimeoutingHTTPHandler handler({std::chrono::seconds(4)});
+ TimeoutingHTTPHandler handler({std::chrono::seconds(2)});
VerifyRWTimeoutInvokeHTTP harness;
run(harness, args.url, args.test_file, args.key_dir, &handler);
}
diff --git a/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp b/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
index 0c65035..8505ce6 100644
--- a/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
+++ b/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
@@ -23,26 +23,36 @@
#include "../../../libminifi/test/TestBase.h"
#include "../PublishKafka.h"
#include "utils/StringUtils.h"
+#include "utils/IntegrationTestUtils.h"
class PublishKafkaOnScheduleTests : public IntegrationBase {
public:
virtual void runAssertions() {
- std::string logs = LogTestController::getInstance().log_output.str();
+ using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+ assert(verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
+ const std::string logs = LogTestController::getInstance().log_output.str();
+ const auto result = utils::StringUtils::countOccurrences(logs, "value 1 is outside allowed range 1000..1000000000");
+ const int occurrences = result.second;
+ return 1 < occurrences;
+ }));
+ flowController_->updatePropertyValue("kafka", minifi::processors::PublishKafka::MaxMessageSize.getName(), "1999");
+ const std::vector<std::string> must_appear_byorder_msgs = {"notifyStop called",
+ "Successfully configured PublishKafka",
+ "PublishKafka onTrigger"};
- auto result = utils::StringUtils::countOccurrences(logs, "value 1 is outside allowed range 1000..1000000000");
- size_t last_pos = result.first;
- int occurrences = result.second;
-
- assert(occurrences > 1); // Verify retry of onSchedule and onUnSchedule calls
-
- std::vector<std::string> must_appear_byorder_msgs = {"notifyStop called",
- "Successfully configured PublishKafka",
- "PublishKafka onTrigger"};
-
- for (const auto &msg : must_appear_byorder_msgs) {
- last_pos = logs.find(msg, last_pos);
- assert(last_pos != std::string::npos);
- }
+ const bool test_success = verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
+ const std::string logs = LogTestController::getInstance().log_output.str();
+ const auto result = utils::StringUtils::countOccurrences(logs, "value 1 is outside allowed range 1000..1000000000");
+ size_t last_pos = result.first;
+ for (const std::string& msg : must_appear_byorder_msgs) {
+ last_pos = logs.find(msg, last_pos);
+ if (last_pos == std::string::npos) {
+ return false;
+ }
+ }
+ return true;
+ });
+ assert(test_success);
}
virtual void testSetup() {
@@ -52,12 +62,6 @@
LogTestController::getInstance().setDebug<minifi::processors::PublishKafka>();
}
- virtual void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::seconds(3));
- flowController_->updatePropertyValue("kafka", minifi::processors::PublishKafka::MaxMessageSize.getName(), "1999");
- std::this_thread::sleep_for(std::chrono::seconds(3));
- }
-
virtual void cleanup() {}
};
diff --git a/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp b/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp
index 96982ae..32fe6c6 100644
--- a/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp
+++ b/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp
@@ -49,6 +49,7 @@
#include "processors/LogAttribute.h"
#include "io/tls/TLSSocket.h"
#include "io/tls/TLSServerSocket.h"
+#include "utils/IntegrationTestUtils.h"
class SecureSocketTest : public IntegrationBase {
public:
@@ -76,9 +77,10 @@
}
void runAssertions() override {
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
isRunning_ = false;
server_socket_.reset();
- assert(LogTestController::getInstance().contains("send succeed 20"));
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_), "send succeed 20"));
}
void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) override {
diff --git a/extensions/standard-processors/tests/integration/TailFileTest.cpp b/extensions/standard-processors/tests/integration/TailFileTest.cpp
index 7f1cb85..9229677 100644
--- a/extensions/standard-processors/tests/integration/TailFileTest.cpp
+++ b/extensions/standard-processors/tests/integration/TailFileTest.cpp
@@ -41,6 +41,7 @@
#include "processors/LogAttribute.h"
#include "state/ProcessorController.h"
#include "integration/IntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
class TailFileTestHarness : public IntegrationBase {
public:
@@ -69,9 +70,11 @@
}
void runAssertions() override {
- assert(LogTestController::getInstance().contains("5 flowfiles were received from TailFile input") == true);
- assert(LogTestController::getInstance().contains("Looking for delimiter 0xA") == true);
- assert(LogTestController::getInstance().contains("li\\ne5") == true);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+ "5 flowfiles were received from TailFile input",
+ "Looking for delimiter 0xA",
+ "li\\ne5"));
}
protected:
diff --git a/extensions/standard-processors/tests/unit/GetFileTests.cpp b/extensions/standard-processors/tests/unit/GetFileTests.cpp
index 273977c..53e393f 100644
--- a/extensions/standard-processors/tests/unit/GetFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/GetFileTests.cpp
@@ -66,8 +66,6 @@
in_file_stream << "The quick brown fox jumps over the lazy dog" << std::endl;
in_file_stream.close();
- std::this_thread::sleep_for(std::chrono::seconds(2));
-
in_file_stream.open(in_file + "2");
in_file_stream << "The quick brown fox jumps over the lazy dog who is 2 legit to quit" << std::endl;
in_file_stream.close();
diff --git a/libminifi/include/utils/IntegrationTestUtils.h b/libminifi/include/utils/IntegrationTestUtils.h
new file mode 100644
index 0000000..9c3db9d
--- /dev/null
+++ b/libminifi/include/utils/IntegrationTestUtils.h
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <utility>
+#include <string>
+#include <vector>
+
+#include "../../../libminifi/test/TestBase.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <class Rep, class Period, typename Fun>
+bool verifyEventHappenedInPollTime(const std::chrono::duration<Rep, Period>& wait_duration, Fun&& check) {
+ std::chrono::system_clock::time_point wait_end = std::chrono::system_clock::now() + wait_duration;
+ do {
+ if (std::forward<Fun>(check)()) {
+ return true;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ } while (std::chrono::system_clock::now() < wait_end);
+ return false;
+}
+
+template <class Rep, class Period, typename ...String>
+bool verifyLogLinePresenceInPollTime(const std::chrono::duration<Rep, Period>& wait_duration, String&&... patterns) {
+ // gcc before 4.9 does not support capturing parameter packs in lambdas: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=41933
+ // Once we support gcc >= 4.9 only, this vector will no longer be necessary, we'll be able to iterate on the parameter pack directly.
+ std::vector<std::string> pattern_list{std::forward<String>(patterns)...};
+ auto check = [&] {
+ const std::string logs = LogTestController::getInstance().log_output.str();
+ return std::all_of(pattern_list.cbegin(), pattern_list.cend(), [&logs] (const std::string& pattern) { return logs.find(pattern) != std::string::npos; });
+ };
+ return verifyEventHappenedInPollTime(wait_duration, check);
+}
+
+} // namespace utils
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 65a4a16..3a24eef 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -55,10 +55,6 @@
virtual void runAssertions() = 0;
- virtual void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_));
- }
-
protected:
virtual void configureC2() {
@@ -134,13 +130,13 @@
flowController_->load();
updateProperties(flowController_);
flowController_->start();
- waitToVerifyProcessor();
+
+ runAssertions();
shutdownBeforeFlowController();
flowController_->unload();
flowController_->stopC2();
- runAssertions();
cleanup();
}
diff --git a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
index 285157d..8b3d28e 100644
--- a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
+++ b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
@@ -26,31 +26,42 @@
#include "../TestBase.h"
#include "../KamikazeProcessor.h"
#include "utils/StringUtils.h"
+#include "utils/IntegrationTestUtils.h"
/*Verify behavior in case exceptions are thrown in onSchedule or onTrigger functions
* KamikazeProcessor is a test processor to trigger errors in these functions */
class KamikazeErrorHandlingTests : public IntegrationBase {
public:
void runAssertions() override {
- std::string logs = LogTestController::getInstance().log_output.str();
+ using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+ assert(verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
+ const std::string logs = LogTestController::getInstance().log_output.str();
+ const auto result = utils::StringUtils::countOccurrences(logs, minifi::processors::KamikazeProcessor::OnScheduleExceptionStr);
+ const int occurrences = result.second;
+ return 1 < occurrences;
+ }));
+ flowController_->updatePropertyValue("kamikaze", minifi::processors::KamikazeProcessor::ThrowInOnSchedule.getName(), "false");
- auto result = utils::StringUtils::countOccurrences(logs, minifi::processors::KamikazeProcessor::OnScheduleExceptionStr);
- size_t last_pos = result.first;
- int occurrences = result.second;
-
- assert(occurrences > 1); // Verify retry of onSchedule and onUnSchedule calls
-
- std::vector<std::string> must_appear_byorder_msgs = {minifi::processors::KamikazeProcessor::OnUnScheduleLogStr,
+ const std::vector<std::string> must_appear_byorder_msgs = {minifi::processors::KamikazeProcessor::OnUnScheduleLogStr,
minifi::processors::KamikazeProcessor::OnScheduleLogStr,
minifi::processors::KamikazeProcessor::OnTriggerExceptionStr,
"[warning] ProcessSession rollback for kamikaze executed"};
- for (const auto &msg : must_appear_byorder_msgs) {
- last_pos = logs.find(msg, last_pos);
- assert(last_pos != std::string::npos);
- }
+ const bool test_success = verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
+ const std::string logs = LogTestController::getInstance().log_output.str();
+ const auto result = utils::StringUtils::countOccurrences(logs, minifi::processors::KamikazeProcessor::OnScheduleExceptionStr);
+ size_t last_pos = result.first;
+ for (const std::string& msg : must_appear_byorder_msgs) {
+ last_pos = logs.find(msg, last_pos);
+ if (last_pos == std::string::npos) {
+ return false;
+ }
+ }
+ return true;
+ });
+ assert(test_success);
- assert(logs.find(minifi::processors::KamikazeProcessor::OnTriggerLogStr) == std::string::npos);
+ assert(LogTestController::getInstance().log_output.str().find(minifi::processors::KamikazeProcessor::OnTriggerLogStr) == std::string::npos);
}
void testSetup() override {
@@ -60,12 +71,6 @@
LogTestController::getInstance().setDebug<minifi::processors::KamikazeProcessor>();
}
- void waitToVerifyProcessor() override {
- std::this_thread::sleep_for(std::chrono::seconds(3));
- flowController_->updatePropertyValue("kamikaze", minifi::processors::KamikazeProcessor::ThrowInOnSchedule.getName(), "false");
- std::this_thread::sleep_for(std::chrono::seconds(3));
- }
-
void cleanup() override {}
};
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp
index c968230..ba9d496 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -38,12 +38,11 @@
#include "../unit/ProvenanceTestHelper.h"
#include "io/StreamFactory.h"
#include "../TestBase.h"
-
-void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::seconds(2));
-}
+#include "utils/IntegrationTestUtils.h"
int main(int argc, char **argv) {
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+
std::string test_file_location;
if (argc > 1) {
test_file_location = argv[1];
@@ -80,11 +79,10 @@
controller->load();
controller->start();
- waitToVerifyProcessor();
+
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(std::chrono::seconds(2)), "Add processor SiteToSiteProvenanceReportingTask into process group MiNiFi Flow"));
controller->waitUnload(60000);
- std::string logs = LogTestController::getInstance().log_output.str();
- assert(logs.find("Add processor SiteToSiteProvenanceReportingTask into process group MiNiFi Flow") != std::string::npos);
LogTestController::getInstance().reset();
rmdir("./content_repository");
rmdir("/tmp/aljs39/");
diff --git a/libminifi/test/pcap-tests/PcapTest.cpp b/libminifi/test/pcap-tests/PcapTest.cpp
index eea0364..2cbaa35 100644
--- a/libminifi/test/pcap-tests/PcapTest.cpp
+++ b/libminifi/test/pcap-tests/PcapTest.cpp
@@ -43,6 +43,7 @@
#include "core/state/ProcessorController.h"
#include "../integration/IntegrationBase.h"
#include "CapturePacket.h"
+#include "utils/IntegrationTestUtils.h"
class PcapTestHarness : public IntegrationBase {
public:
@@ -66,10 +67,13 @@
}
void runAssertions() {
- assert(LogTestController::getInstance().contains("Starting capture") == true);
- assert(LogTestController::getInstance().contains("Stopping capture") == true);
- assert(LogTestController::getInstance().contains("Stopped device capture. clearing queues") == true);
- assert(LogTestController::getInstance().contains("Accepting ") == true && LogTestController::getInstance().contains("because it matches .*") );
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+ "Starting capture",
+ "Stopping capture",
+ "Stopped device capture. clearing queues",
+ "Accepting ",
+ "because it matches .*"));
}
void updateProperties(std::shared_ptr<minifi::FlowController> fc) {
diff --git a/libminifi/test/sensors-tests/SensorTests.cpp b/libminifi/test/sensors-tests/SensorTests.cpp
index 40047f5..83fbc4d 100644
--- a/libminifi/test/sensors-tests/SensorTests.cpp
+++ b/libminifi/test/sensors-tests/SensorTests.cpp
@@ -42,6 +42,7 @@
#include "core/ConfigurableComponent.h"
#include "../integration/IntegrationBase.h"
#include "GetEnvironmentalSensors.h"
+#include "utils/IntegrationTestUtils.h"
class PcapTestHarness : public IntegrationBase {
public:
@@ -64,7 +65,8 @@
}
void runAssertions() {
- assert(LogTestController::getInstance().contains("Initializing EnvironmentalSensors") == true);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_), "Initializing EnvironmentalSensors"));
}
void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
diff --git a/libminifi/test/unit/BackTraceTests.cpp b/libminifi/test/unit/BackTraceTests.cpp
index 37e8aaa..19ccedf 100644
--- a/libminifi/test/unit/BackTraceTests.cpp
+++ b/libminifi/test/unit/BackTraceTests.cpp
@@ -75,7 +75,7 @@
std::atomic<int> counter;
int counterFunction() {
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ std::this_thread::sleep_for(std::chrono::milliseconds(150));
return ++counter;
}
@@ -83,7 +83,7 @@
counter = 0;
utils::ThreadPool<int> pool(4);
pool.start();
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ std::this_thread::sleep_for(std::chrono::milliseconds(150));
for (int i = 0; i < 3; i++) {
std::function<int()> f_ex = counterFunction;
std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(5));
diff --git a/libminifi/test/unit/MinifiConcurrentQueueTests.cpp b/libminifi/test/unit/MinifiConcurrentQueueTests.cpp
index 76b7642..1b0dc0f 100644
--- a/libminifi/test/unit/MinifiConcurrentQueueTests.cpp
+++ b/libminifi/test/unit/MinifiConcurrentQueueTests.cpp
@@ -26,25 +26,10 @@
#include "../TestBase.h"
#include "utils/MinifiConcurrentQueue.h"
#include "utils/StringUtils.h"
+#include "utils/IntegrationTestUtils.h"
namespace utils = org::apache::nifi::minifi::utils;
-namespace {
-
- template<typename Function, typename Rep, typename Period>
- bool becomesTrueWithinTimeout(const Function &condition, std::chrono::duration<Rep, Period> timeout) {
- auto start_time = std::chrono::steady_clock::now();
- while (std::chrono::steady_clock::now() < start_time + timeout) {
- if (condition()) {
- return true;
- }
- std::this_thread::sleep_for(std::chrono::milliseconds{1});
- }
- return false;
- }
-
-} // namespace
-
namespace MinifiConcurrentQueueTestProducersConsumers {
// Producers
@@ -75,7 +60,7 @@
std::thread getSimpleTryDequeConsumerThread(utils::ConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
return std::thread([&queue, &results] {
- constexpr std::size_t max_read_attempts = 1000;
+ constexpr std::size_t max_read_attempts = 300;
for (std::size_t attempt_num = 0; results.size() < 3 && attempt_num < max_read_attempts; ++attempt_num) {
std::string s;
if (queue.tryDequeue(s)) {
@@ -89,7 +74,7 @@
std::thread getSimpleConsumeConsumerThread(utils::ConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
return std::thread([&queue, &results] {
- constexpr std::size_t max_read_attempts = 1000;
+ constexpr std::size_t max_read_attempts = 300;
for (std::size_t attempt_num = 0; results.size() < 3 && attempt_num < max_read_attempts; ++attempt_num) {
if (!queue.consume([&results] (const std::string& s) { results.push_back(s); })) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -146,7 +131,7 @@
std::thread getDequeueWaitForConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
return std::thread([&queue, &results] {
- constexpr std::size_t max_read_attempts = 1000;
+ constexpr std::size_t max_read_attempts = 300;
for (std::size_t attempt_num = 0; results.size() < 3 && attempt_num < max_read_attempts; ++attempt_num) {
std::string s;
if (queue.dequeueWaitFor(s, std::chrono::milliseconds(1))) {
@@ -158,7 +143,7 @@
std::thread getDequeueWaitUntilConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
return std::thread([&queue, &results] {
- constexpr std::size_t max_read_attempts = 1000;
+ constexpr std::size_t max_read_attempts = 300;
for (std::size_t attempt_num = 0; results.size() < 3 && attempt_num < max_read_attempts; ++attempt_num) {
std::string s;
const std::chrono::system_clock::time_point timeout_point = std::chrono::system_clock::now() + std::chrono::milliseconds(1);
@@ -171,7 +156,7 @@
std::thread getConsumeWaitForConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
return std::thread([&queue, &results]() {
- constexpr std::size_t max_read_attempts = 1000;
+ constexpr std::size_t max_read_attempts = 300;
for (std::size_t attempt_num = 0; results.size() < 3 && attempt_num < max_read_attempts; ++attempt_num) {
queue.consumeWaitFor([&results] (const std::string& s) { results.push_back(s); }, std::chrono::milliseconds(1));
}
@@ -311,7 +296,7 @@
producer.join();
auto queue_is_empty = [&queue]() { return queue.empty(); };
- REQUIRE(becomesTrueWithinTimeout(queue_is_empty, std::chrono::seconds{1}));
+ REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds(1), queue_is_empty));
queue.stop();
consumer.join();
@@ -330,7 +315,7 @@
std::thread producer { getSimpleProducerThread(queue) };
auto we_have_all_results = [&results_size]() { return results_size >= 3; };
- REQUIRE(becomesTrueWithinTimeout(we_have_all_results, std::chrono::seconds{1}));
+ REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds(1), we_have_all_results));
queue.stop();
producer.join();
@@ -366,7 +351,7 @@
std::mt19937 rng(dev());
std::uniform_int_distribution<std::mt19937::result_type> dist(1, std::numeric_limits<int>::max());
- std::vector<int> source(1000000);
+ std::vector<int> source(50000);
std::vector<int> target;
generate(source.begin(), source.end(), [&rng, &dist](){ return dist(rng); });
@@ -380,7 +365,7 @@
std::thread relay([&queue, &cqueue]() {
size_t cnt = 0;
- while (cnt < 1000000) {
+ while (cnt < 50000) {
int i;
if (queue.tryDequeue(i)) {
cnt++;
@@ -400,7 +385,7 @@
relay.join();
auto queue_is_empty = [&cqueue]() { return cqueue.empty(); };
- REQUIRE(becomesTrueWithinTimeout(queue_is_empty, std::chrono::seconds{1}));
+ REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds(1), queue_is_empty));
cqueue.stop();
consumer.join();