MINIFICPP-1281 - Use event polling instead of sleep by sync for simple multithreaded tests

MINIFICPP-1281 - Further improve test performance by changing the integration tests to poll for events while waiting

MINIFICPP-1281 - Decrease sleep and load in MinifiConcurrentQueueTests

MINIFICPP-1281 - Tweak integration test heartbeat periods

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #835
diff --git a/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp b/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
index e87819a..0425583 100644
--- a/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
+++ b/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
@@ -53,6 +53,7 @@
 #include "CoapServer.h"
 #include "io/BaseStream.h"
 #include "concurrentqueue.h"
+#include "utils/IntegrationTestUtils.h"
 
 class VerifyCoAPServer : public CoapIntegrationBase {
  public:
@@ -82,12 +83,12 @@
   }
 
   void runAssertions() {
-
-    std::this_thread::sleep_for(std::chrono::milliseconds(3000));
-    assert(LogTestController::getInstance().contains("Received ack. version 3. number of operations 1") == true);
-    assert(LogTestController::getInstance().contains("Received ack. version 3. number of operations 0") == true);
-    assert(LogTestController::getInstance().contains("Received error event from protocol") == true);
-    assert(LogTestController::getInstance().contains("Received op 1, with id id and operand operand") == true);
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(3),
+        "Received ack. version 3. number of operations 1",
+        "Received ack. version 3. number of operations 0",
+        "Received error event from protocol",
+        "Received op 1, with id id and operand operand"));
   }
 
   void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index ceefc6d..1c4d264 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -51,6 +51,7 @@
     std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
 
     configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+    configuration->set("c2.agent.heartbeat.period", "200");
 
     std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
     content_repo->initialize(configuration);
@@ -74,12 +75,12 @@
 
     controller->load();
     controller->start();
-    waitToVerifyProcessor();
+
+    runAssertions();
 
     shutdownBeforeFlowController();
     controller->waitUnload(wait_time_);
     controller->stopC2();
-    runAssertions();
 
     cleanup();
   }
diff --git a/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp b/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp
index 40da9e2..975aa45 100644
--- a/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp
+++ b/extensions/expression-language/tests/integration/UpdateAttributeIntegrationTest.cpp
@@ -33,6 +33,7 @@
 #include "integration/IntegrationBase.h"
 #include "ProcessContextExpr.h"
 #include "TestBase.h"
+#include "utils/IntegrationTestUtils.h"
 
 class TestHarness : public IntegrationBase {
  public:
@@ -46,9 +47,11 @@
   void cleanup() override {}
 
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("key:route_check_attr value:good"));
-    assert(LogTestController::getInstance().contains("key:variable_attribute value:replacement_value"));
-    assert(LogTestController::getInstance().contains("ProcessSession rollback", std::chrono::seconds(1)) == false);  // No rollback happened
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+        "key:route_check_attr value:good",
+        "key:variable_attribute value:replacement_value"));
+    assert(false == verifyLogLinePresenceInPollTime(std::chrono::milliseconds(200), "ProcessSession rollback"));  // No rollback happened
   }
 
   void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) override {
diff --git a/extensions/http-curl/tests/C2FailedUpdateTest.cpp b/extensions/http-curl/tests/C2FailedUpdateTest.cpp
index cc18b3f..599a901 100644
--- a/extensions/http-curl/tests/C2FailedUpdateTest.cpp
+++ b/extensions/http-curl/tests/C2FailedUpdateTest.cpp
@@ -19,6 +19,7 @@
 #undef NDEBUG
 #include "HTTPIntegrationBase.h"
 #include "HTTPHandlers.h"
+#include "utils/IntegrationTestUtils.h"
 
 int main(int argc, char **argv) {
   const cmd_args args = parse_cmdline_args(argc, argv, "update");
diff --git a/extensions/http-curl/tests/C2JstackTest.cpp b/extensions/http-curl/tests/C2JstackTest.cpp
index 77a315f..3386fbb 100644
--- a/extensions/http-curl/tests/C2JstackTest.cpp
+++ b/extensions/http-curl/tests/C2JstackTest.cpp
@@ -21,11 +21,13 @@
 #include "TestBase.h"
 #include "HTTPIntegrationBase.h"
 #include "HTTPHandlers.h"
+#include "utils/IntegrationTestUtils.h"
 
 class VerifyC2DescribeJstack : public VerifyC2Describe {
  public:
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("SchedulingAgent"));
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_), "SchedulingAgent"));
   }
 };
 
diff --git a/extensions/http-curl/tests/C2NullConfiguration.cpp b/extensions/http-curl/tests/C2NullConfiguration.cpp
index 07ebeae..34db113 100644
--- a/extensions/http-curl/tests/C2NullConfiguration.cpp
+++ b/extensions/http-curl/tests/C2NullConfiguration.cpp
@@ -30,6 +30,7 @@
 #include "c2/C2Agent.h"
 #include "processors/LogAttribute.h"
 #include "HTTPIntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
 
 class VerifyC2Server : public HTTPIntegrationBase {
 public:
@@ -57,8 +58,10 @@
   }
 
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("C2Agent] [debug] Could not instantiate null"));
-    assert(LogTestController::getInstance().contains("Class is RESTSender"));
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+        "C2Agent] [debug] Could not instantiate null",
+        "Class is RESTSender"));
   }
 
   void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) override {
diff --git a/extensions/http-curl/tests/C2UpdateAgentTest.cpp b/extensions/http-curl/tests/C2UpdateAgentTest.cpp
index 53ce3d7..3ab4409 100644
--- a/extensions/http-curl/tests/C2UpdateAgentTest.cpp
+++ b/extensions/http-curl/tests/C2UpdateAgentTest.cpp
@@ -34,6 +34,6 @@
 
   const auto then = std::chrono::system_clock::now();
   const auto seconds = std::chrono::duration_cast<std::chrono::seconds>(then - start).count();
-  assert(handler.calls_ <= (seconds) + 1);
+  assert(handler.calls_ <= (seconds) + 2);
   return 0;
 }
diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp b/extensions/http-curl/tests/C2UpdateTest.cpp
index 0475525..2c85a79 100644
--- a/extensions/http-curl/tests/C2UpdateTest.cpp
+++ b/extensions/http-curl/tests/C2UpdateTest.cpp
@@ -19,6 +19,7 @@
 #undef NDEBUG
 #include "HTTPIntegrationBase.h"
 #include "HTTPHandlers.h"
+#include "utils/IntegrationTestUtils.h"
 
 int main(int argc, char **argv) {
   const cmd_args args = parse_cmdline_args(argc, argv, "update");
diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
index 65c2821..40ccbd7 100644
--- a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
@@ -24,6 +24,7 @@
 #include "protocols/RESTReceiver.h"
 #include "HTTPIntegrationBase.h"
 #include "HTTPHandlers.h"
+#include "utils/IntegrationTestUtils.h"
 
 class LightWeightC2Handler : public HeartbeatHandler {
  public:
@@ -51,9 +52,11 @@
   }
 
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("Received Ack from Server"));
-    assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke"));
-    assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController"));
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+        "Received Ack from Server",
+        "C2Agent] [debug] Stopping component invoke",
+        "C2Agent] [debug] Stopping component FlowController"));
   }
 
   void configureFullHeartbeat() override {
@@ -62,7 +65,7 @@
 };
 
 class VerifyLightWeightC2Heartbeat : public VerifyC2Heartbeat {
-public:
+ public:
   void configureFullHeartbeat() override {
     configuration->set("nifi.c2.full.heartbeat", "false");
   }
diff --git a/extensions/http-curl/tests/C2VerifyServeResults.cpp b/extensions/http-curl/tests/C2VerifyServeResults.cpp
index 2f9f36a..e814e7c 100644
--- a/extensions/http-curl/tests/C2VerifyServeResults.cpp
+++ b/extensions/http-curl/tests/C2VerifyServeResults.cpp
@@ -26,6 +26,7 @@
 #include "properties/Configure.h"
 #include "TestServer.h"
 #include "HTTPIntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
 
 class VerifyC2Server : public HTTPIntegrationBase {
 public:
@@ -49,8 +50,10 @@
   }
 
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("Import offset 0"));
-    assert(LogTestController::getInstance().contains("Outputting success and response"));
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+        "Import offset 0",
+        "Outputting success and response"));
   }
 
   void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) override {
diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
index 0917f50..1ada032 100644
--- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
+++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
@@ -36,6 +36,7 @@
 #include "unit/MockClasses.h"
 #include "unit/ProvenanceTestHelper.h"
 #include "integration/IntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
 
 REGISTER_RESOURCE(MockControllerService, "");
 REGISTER_RESOURCE(MockProcessor, "");
@@ -45,6 +46,7 @@
 }
 
 int main(int argc, char **argv) {
+  using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
   const cmd_args args = parse_cmdline_args(argc, argv);
 
   std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
@@ -106,35 +108,27 @@
   assert(ssl_client->getCACertificate().length() > 0);
   // now let's disable one of the controller services.
   std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller->getControllerServiceNode("ID");
+  const auto checkCsIdEnabledMatchesDisabledFlag = [&cs_id] { return !disabled == cs_id->enabled(); };
   assert(cs_id != nullptr);
   {
     std::lock_guard<std::mutex> lock(control_mutex);
-    controller->disableControllerService(cs_id);
-    disabled = true;
-    waitToVerifyProcessor();
-  }
-  {
-    std::lock_guard<std::mutex> lock(control_mutex);
     controller->enableControllerService(cs_id);
     disabled = false;
-    waitToVerifyProcessor();
   }
   std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995");
-  assert(cs_id->enabled());
-{
+  assert(verifyEventHappenedInPollTime(std::chrono::seconds(4), checkCsIdEnabledMatchesDisabledFlag));
+  {
     std::lock_guard<std::mutex> lock(control_mutex);
     controller->disableReferencingServices(mock_cont);
     disabled = true;
-    waitToVerifyProcessor();
   }
-    assert(cs_id->enabled() == false);
-{
+  assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
+  {
     std::lock_guard<std::mutex> lock(control_mutex);
     controller->enableReferencingServices(mock_cont);
     disabled = false;
-    waitToVerifyProcessor();
   }
-  assert(cs_id->enabled() == true);
+  assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag));
 
   controller->waitUnload(60000);
   return 0;
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index c8bbb69..9e1940a 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -24,6 +24,7 @@
 #include "protocols/RESTSender.h"
 #include "ServerAwareHandler.h"
 #include "TestBase.h"
+#include "utils/IntegrationTestUtils.h"
 #include "TestServer.h"
 
 int log_message(const struct mg_connection *conn, const char *message) {
@@ -130,6 +131,9 @@
   }
 
   void runAssertions() override {
+    // This class is never used for running assertions, but we are forced to wait for DescribeManifestHandler to verifyJsonHasAgentManifest
+    // if we were to log something on finished verification, we could poll on finding it 
+    std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_));
   }
 };
 
@@ -158,7 +162,8 @@
   }
 
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("Starting to reload Flow Controller with flow control name MiNiFi Flow, version"));
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(10), "Starting to reload Flow Controller with flow control name MiNiFi Flow, version"));
   }
 };
 
@@ -179,8 +184,8 @@
   }
 
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("removing file"));
-    assert(LogTestController::getInstance().contains("May not have command processor"));
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(10), "removing file", "May not have command processor"));
   }
 };
 
@@ -197,8 +202,8 @@
   }
 
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("Invalid configuration payload"));
-    assert(LogTestController::getInstance().contains("update failed"));
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(10), "Invalid configuration payload", "update failed."));
   }
 
   void cleanup() override {
diff --git a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
index febef4f..f504c90 100644
--- a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
@@ -65,7 +65,10 @@
 
   void cleanup() override {}
 
-  void runAssertions() override {}
+  void runAssertions() override {
+    // There is nothing to verify here, but we are expected to wait for all paralell events to execute 
+    std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_));
+  }
 
  protected:
   bool isSecure;
diff --git a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
index 0efffb5..b2dd31a 100644
--- a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
@@ -36,12 +36,17 @@
 #include "unit/ProvenanceTestHelper.h"
 #include "io/StreamFactory.h"
 #include "processors/LogAttribute.h"
-#include "HTTPIntegrationBase.h"
+#include "integration/IntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
 
-namespace {
-  void waitToVerifyProcessor() {
-    std::this_thread::sleep_for(std::chrono::seconds(10));
-  }
+int log_message(const struct mg_connection *conn, const char *message) {
+  puts(message);
+  return 1;
+}
+
+int ssl_enable(void* /*ssl_context*/, void* /*user_data*/) {
+  puts("Enable ssl");
+  return 0;
 }
 
 class HttpResponder : public CivetHandler {
@@ -59,6 +64,7 @@
 };
 
 int main(int argc, char **argv) {
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
   const cmd_args args = parse_cmdline_args(argc, argv);
 
   LogTestController::getInstance().setDebug<core::Processor>();
@@ -119,19 +125,19 @@
   }
   controller->load();
   controller->start();
-  waitToVerifyProcessor();
+  
+  assert(verifyLogLinePresenceInPollTime(
+      std::chrono::seconds(10),
+      "key:filename value:",
+      "key:invokehttp.request.url value:" + url,
+      "key:invokehttp.status.code value:200",
+      "key:flow.id"));
 
   controller->waitUnload(60000);
   if (url.find("localhost") == std::string::npos) {
     server.reset();
     exit(1);
   }
-  std::string logs = LogTestController::getInstance().log_output.str();
-
-  assert(logs.find("key:filename value:") != std::string::npos);
-  assert(logs.find("key:invokehttp.request.url value:" + url) != std::string::npos);
-  assert(logs.find("key:invokehttp.status.code value:200") != std::string::npos);
-  assert(logs.find("key:flow.id") != std::string::npos);
 
   LogTestController::getInstance().reset();
   return 0;
diff --git a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
index 71c2957..b36f811 100644
--- a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
@@ -28,6 +28,7 @@
 #include "core/ProcessGroup.h"
 #include "FlowController.h"
 #include "HTTPIntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
 
 class HttpTestHarness : public HTTPIntegrationBase {
 public:
@@ -65,9 +66,11 @@
   }
 
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("curl performed"));
-    assert(LogTestController::getInstance().contains("Size:1024 Offset:0"));
-    assert(!LogTestController::getInstance().contains("Size:0 Offset:0"));
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+      "curl performed",
+      "Size:1024 Offset:0"));
+    assert(false == verifyLogLinePresenceInPollTime(std::chrono::milliseconds(200), "Size:0 Offset:0"));
   }
 
  protected:
diff --git a/extensions/http-curl/tests/SiteToSiteRestTest.cpp b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
index 2c58b9b..cfa09d7 100644
--- a/extensions/http-curl/tests/SiteToSiteRestTest.cpp
+++ b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
@@ -30,6 +30,7 @@
 #include "core/ConfigurableComponent.h"
 #include "controllers/SSLContextService.h"
 #include "HTTPIntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
 
 class Responder : public ServerAwareHandler {
  public:
@@ -86,12 +87,13 @@
   }
 
   void runAssertions() override {
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
     if (isSecure) {
-      assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 1"));
+      assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_), "process group remote site2site port 10001, is secure 1"));
     } else {
-      assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 0"));
+      assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_), "process group remote site2site port 10001, is secure 0"));
     }
-    assert(LogTestController::getInstance().contains("ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed "));
+    assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_), "ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed "));
   }
 
  protected:
diff --git a/extensions/http-curl/tests/ThreadPoolAdjust.cpp b/extensions/http-curl/tests/ThreadPoolAdjust.cpp
index c06966e..fffbf10 100644
--- a/extensions/http-curl/tests/ThreadPoolAdjust.cpp
+++ b/extensions/http-curl/tests/ThreadPoolAdjust.cpp
@@ -28,6 +28,7 @@
 #include "FlowController.h"
 #include "HTTPIntegrationBase.h"
 #include "processors/LogAttribute.h"
+#include "utils/IntegrationTestUtils.h"
 
 class HttpTestHarness : public IntegrationBase {
  public:
@@ -64,9 +65,11 @@
   }
 
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("curl performed"));
-    assert(LogTestController::getInstance().contains("Size:1024 Offset:0"));
-    assert(!LogTestController::getInstance().contains("Size:0 Offset:0"));
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+      "curl performed",
+      "Size:1024 Offset:0"));
+    assert(false == verifyLogLinePresenceInPollTime(std::chrono::milliseconds(200), "Size:0 Offset:0"));
   }
 
  protected:
diff --git a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
index f723793..99e5a01 100644
--- a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
@@ -66,7 +66,10 @@
 
   void cleanup() override {}
 
-  void runAssertions() override {}
+  void runAssertions() override {
+    // There is nothing to verify here, but we are expected to wait for all paralell events to execute 
+    std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_));
+  }
 
 protected:
   bool isSecure;
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
index 74e5324..a0bf58b 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
+++ b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
@@ -26,9 +26,10 @@
 
 #include "CivetServer.h"
 #include "HTTPIntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
 
 class VerifyInvokeHTTP : public HTTPIntegrationBase {
-public:
+ public:
   VerifyInvokeHTTP()
       : HTTPIntegrationBase(6000) {
   }
@@ -55,7 +56,7 @@
     std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
 
     configuration->set(minifi::Configure::nifi_flow_configuration_file, flow_yml_path);
-
+    configuration->set("c2.agent.heartbeat.period", "200");
     std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
     content_repo->initialize(configuration);
     std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
@@ -79,7 +80,8 @@
     setupFlow(flow_yml_path);
     startFlowController();
 
-    waitToVerifyProcessor();
+    runAssertions();
+
     shutdownBeforeFlowController();
     stopFlowController();
   }
@@ -92,47 +94,55 @@
     flowController_->unload();
     flowController_->stopC2();
 
-    runAssertions();
     cleanup();
   }
 };
 
 class VerifyInvokeHTTPOKResponse : public VerifyInvokeHTTP {
-public:
+ public:
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("key:invokehttp.status.code value:201"));
-    assert(LogTestController::getInstance().contains("response code 201"));
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(6),
+        "key:invokehttp.status.code value:201",
+        "response code 201"));
   }
 };
 
 class VerifyCouldNotConnectInvokeHTTP : public VerifyInvokeHTTP {
-public:
+ public:
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("key:invoke_http value:failure"));
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(6), "key:invoke_http value:failure"));
   }
 };
 
 class VerifyNoRetryInvokeHTTP : public VerifyInvokeHTTP {
-public:
+ public:
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("key:invokehttp.status.message value:HTTP/1.1 404 Not Found"));
-    assert(LogTestController::getInstance().contains("isSuccess: 0, response code 404"));
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(6),
+        "key:invokehttp.status.message value:HTTP/1.1 404 Not Found",
+        "isSuccess: 0, response code 404"));
   }
 };
 
 class VerifyRetryInvokeHTTP : public VerifyInvokeHTTP {
-public:
+ public:
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("key:invokehttp.status.message value:HTTP/1.1 501 Not Implemented"));
-    assert(LogTestController::getInstance().contains("isSuccess: 0, response code 501"));
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(6),
+        "key:invokehttp.status.message value:HTTP/1.1 501 Not Implemented",
+        "isSuccess: 0, response code 501"));
   }
 };
 
 class VerifyRWTimeoutInvokeHTTP : public VerifyInvokeHTTP {
-public:
+ public:
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("key:invoke_http value:failure"));
-    assert(LogTestController::getInstance().contains("limit (1000ms) reached, terminating connection"));
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::seconds(6),
+        "key:invoke_http value:failure",
+        "limit (1000ms) reached, terminating connection"));
   }
 };
 
@@ -160,7 +170,7 @@
     harness.setupFlow(args.test_file);
     harness.shutdownBeforeFlowController();
     harness.startFlowController();
-    harness.waitToVerifyProcessor();
+    harness.runAssertions();
     harness.stopFlowController();
   }
 
@@ -183,7 +193,7 @@
   }
 
   {
-    TimeoutingHTTPHandler handler({std::chrono::seconds(4)});
+    TimeoutingHTTPHandler handler({std::chrono::seconds(2)});
     VerifyRWTimeoutInvokeHTTP harness;
     run(harness, args.url, args.test_file, args.key_dir, &handler);
   }
diff --git a/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp b/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
index 0c65035..8505ce6 100644
--- a/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
+++ b/extensions/librdkafka/tests/PublishKafkaOnScheduleTests.cpp
@@ -23,26 +23,36 @@
 #include "../../../libminifi/test/TestBase.h"
 #include "../PublishKafka.h"
 #include "utils/StringUtils.h"
+#include "utils/IntegrationTestUtils.h"
 
 class PublishKafkaOnScheduleTests : public IntegrationBase {
  public:
     virtual void runAssertions() {
-      std::string logs = LogTestController::getInstance().log_output.str();
+      using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+      assert(verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
+        const std::string logs = LogTestController::getInstance().log_output.str();
+        const auto result = utils::StringUtils::countOccurrences(logs, "value 1 is outside allowed range 1000..1000000000");
+        const int occurrences = result.second;
+        return 1 < occurrences;
+      }));
+      flowController_->updatePropertyValue("kafka", minifi::processors::PublishKafka::MaxMessageSize.getName(), "1999");
+      const std::vector<std::string> must_appear_byorder_msgs = {"notifyStop called",
+          "Successfully configured PublishKafka",
+          "PublishKafka onTrigger"};
 
-      auto result = utils::StringUtils::countOccurrences(logs, "value 1 is outside allowed range 1000..1000000000");
-      size_t last_pos = result.first;
-      int occurrences = result.second;
-
-      assert(occurrences > 1);  // Verify retry of onSchedule and onUnSchedule calls
-
-      std::vector<std::string> must_appear_byorder_msgs = {"notifyStop called",
-                                                           "Successfully configured PublishKafka",
-                                                           "PublishKafka onTrigger"};
-
-      for (const auto &msg : must_appear_byorder_msgs) {
-        last_pos = logs.find(msg, last_pos);
-        assert(last_pos != std::string::npos);
-      }
+      const bool test_success = verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
+        const std::string logs = LogTestController::getInstance().log_output.str();
+        const auto result = utils::StringUtils::countOccurrences(logs, "value 1 is outside allowed range 1000..1000000000");
+        size_t last_pos = result.first;
+        for (const std::string& msg : must_appear_byorder_msgs) {
+          last_pos = logs.find(msg, last_pos);
+          if (last_pos == std::string::npos)  {
+            return false;
+          }
+        }
+        return true;
+      });
+      assert(test_success);
     }
 
     virtual void testSetup() {
@@ -52,12 +62,6 @@
       LogTestController::getInstance().setDebug<minifi::processors::PublishKafka>();
     }
 
-    virtual void waitToVerifyProcessor() {
-      std::this_thread::sleep_for(std::chrono::seconds(3));
-      flowController_->updatePropertyValue("kafka", minifi::processors::PublishKafka::MaxMessageSize.getName(), "1999");
-      std::this_thread::sleep_for(std::chrono::seconds(3));
-    }
-
     virtual void cleanup() {}
 };
 
diff --git a/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp b/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp
index 96982ae..32fe6c6 100644
--- a/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp
+++ b/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp
@@ -49,6 +49,7 @@
 #include "processors/LogAttribute.h"
 #include "io/tls/TLSSocket.h"
 #include "io/tls/TLSServerSocket.h"
+#include "utils/IntegrationTestUtils.h"
 
 class SecureSocketTest : public IntegrationBase {
  public:
@@ -76,9 +77,10 @@
   }
 
   void runAssertions() override {
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
     isRunning_ = false;
     server_socket_.reset();
-    assert(LogTestController::getInstance().contains("send succeed 20"));
+    assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_), "send succeed 20"));
   }
 
   void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) override {
diff --git a/extensions/standard-processors/tests/integration/TailFileTest.cpp b/extensions/standard-processors/tests/integration/TailFileTest.cpp
index 7f1cb85..9229677 100644
--- a/extensions/standard-processors/tests/integration/TailFileTest.cpp
+++ b/extensions/standard-processors/tests/integration/TailFileTest.cpp
@@ -41,6 +41,7 @@
 #include "processors/LogAttribute.h"
 #include "state/ProcessorController.h"
 #include "integration/IntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
 
 class TailFileTestHarness : public IntegrationBase {
  public:
@@ -69,9 +70,11 @@
   }
 
   void runAssertions() override {
-    assert(LogTestController::getInstance().contains("5 flowfiles were received from TailFile input") == true);
-    assert(LogTestController::getInstance().contains("Looking for delimiter 0xA") == true);
-    assert(LogTestController::getInstance().contains("li\\ne5") == true);
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+        "5 flowfiles were received from TailFile input",
+        "Looking for delimiter 0xA",
+        "li\\ne5"));
   }
 
  protected:
diff --git a/extensions/standard-processors/tests/unit/GetFileTests.cpp b/extensions/standard-processors/tests/unit/GetFileTests.cpp
index 273977c..53e393f 100644
--- a/extensions/standard-processors/tests/unit/GetFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/GetFileTests.cpp
@@ -66,8 +66,6 @@
   in_file_stream << "The quick brown fox jumps over the lazy dog" << std::endl;
   in_file_stream.close();
 
-  std::this_thread::sleep_for(std::chrono::seconds(2));
-
   in_file_stream.open(in_file + "2");
   in_file_stream << "The quick brown fox jumps over the lazy dog who is 2 legit to quit" << std::endl;
   in_file_stream.close();
diff --git a/libminifi/include/utils/IntegrationTestUtils.h b/libminifi/include/utils/IntegrationTestUtils.h
new file mode 100644
index 0000000..9c3db9d
--- /dev/null
+++ b/libminifi/include/utils/IntegrationTestUtils.h
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <utility>
+#include <string>
+#include <vector>
+
+#include "../../../libminifi/test/TestBase.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+template <class Rep, class Period, typename Fun>
+bool verifyEventHappenedInPollTime(const std::chrono::duration<Rep, Period>& wait_duration, Fun&& check) {
+  std::chrono::system_clock::time_point wait_end = std::chrono::system_clock::now() + wait_duration;
+  do {
+    if (std::forward<Fun>(check)()) {
+      return true;
+    }
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+  } while (std::chrono::system_clock::now() < wait_end);
+  return false;
+}
+
+template <class Rep, class Period, typename ...String>
+bool verifyLogLinePresenceInPollTime(const std::chrono::duration<Rep, Period>& wait_duration, String&&... patterns) {
+  // gcc before 4.9 does not support capturing parameter packs in lambdas: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=41933
+  // Once we support gcc >= 4.9 only, this vector will no longer be necessary, we'll be able to iterate on the parameter pack directly.
+  std::vector<std::string> pattern_list{std::forward<String>(patterns)...};
+  auto check = [&] {
+    const std::string logs = LogTestController::getInstance().log_output.str();
+    return std::all_of(pattern_list.cbegin(), pattern_list.cend(), [&logs] (const std::string& pattern) { return logs.find(pattern) != std::string::npos; });
+  };
+  return verifyEventHappenedInPollTime(wait_duration, check);
+}
+
+}  // namespace utils
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 65a4a16..3a24eef 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -55,10 +55,6 @@
 
   virtual void runAssertions() = 0;
 
-  virtual void waitToVerifyProcessor() {
-    std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_));
-  }
-
  protected:
 
   virtual void configureC2() {
@@ -134,13 +130,13 @@
   flowController_->load();
   updateProperties(flowController_);
   flowController_->start();
-  waitToVerifyProcessor();
+
+  runAssertions();
 
   shutdownBeforeFlowController();
   flowController_->unload();
   flowController_->stopC2();
 
-  runAssertions();
   cleanup();
 }
 
diff --git a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
index 285157d..8b3d28e 100644
--- a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
+++ b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
@@ -26,31 +26,42 @@
 #include "../TestBase.h"
 #include "../KamikazeProcessor.h"
 #include "utils/StringUtils.h"
+#include "utils/IntegrationTestUtils.h"
 
 /*Verify behavior in case exceptions are thrown in onSchedule or onTrigger functions
  * KamikazeProcessor is a test processor to trigger errors in these functions */
 class KamikazeErrorHandlingTests : public IntegrationBase {
  public:
   void runAssertions() override {
-    std::string logs = LogTestController::getInstance().log_output.str();
+    using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
+    assert(verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
+      const std::string logs = LogTestController::getInstance().log_output.str();
+      const auto result = utils::StringUtils::countOccurrences(logs, minifi::processors::KamikazeProcessor::OnScheduleExceptionStr);
+      const int occurrences = result.second;
+      return 1 < occurrences;
+    }));
+    flowController_->updatePropertyValue("kamikaze", minifi::processors::KamikazeProcessor::ThrowInOnSchedule.getName(), "false");
 
-    auto result = utils::StringUtils::countOccurrences(logs, minifi::processors::KamikazeProcessor::OnScheduleExceptionStr);
-    size_t last_pos = result.first;
-    int occurrences = result.second;
-
-    assert(occurrences > 1);  // Verify retry of onSchedule and onUnSchedule calls
-
-    std::vector<std::string> must_appear_byorder_msgs = {minifi::processors::KamikazeProcessor::OnUnScheduleLogStr,
+    const std::vector<std::string> must_appear_byorder_msgs = {minifi::processors::KamikazeProcessor::OnUnScheduleLogStr,
                                                  minifi::processors::KamikazeProcessor::OnScheduleLogStr,
                                                  minifi::processors::KamikazeProcessor::OnTriggerExceptionStr,
                                                  "[warning] ProcessSession rollback for kamikaze executed"};
 
-    for (const auto &msg : must_appear_byorder_msgs) {
-      last_pos = logs.find(msg, last_pos);
-      assert(last_pos != std::string::npos);
-    }
+    const bool test_success = verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
+      const std::string logs = LogTestController::getInstance().log_output.str();
+      const auto result = utils::StringUtils::countOccurrences(logs, minifi::processors::KamikazeProcessor::OnScheduleExceptionStr);
+      size_t last_pos = result.first;
+      for (const std::string& msg : must_appear_byorder_msgs) {
+        last_pos = logs.find(msg, last_pos);
+        if (last_pos == std::string::npos)  {
+          return false;
+        }
+      }
+      return true;
+    });
+    assert(test_success);
 
-    assert(logs.find(minifi::processors::KamikazeProcessor::OnTriggerLogStr) == std::string::npos);
+    assert(LogTestController::getInstance().log_output.str().find(minifi::processors::KamikazeProcessor::OnTriggerLogStr) == std::string::npos);
   }
 
   void testSetup() override {
@@ -60,12 +71,6 @@
     LogTestController::getInstance().setDebug<minifi::processors::KamikazeProcessor>();
   }
 
-  void waitToVerifyProcessor() override {
-    std::this_thread::sleep_for(std::chrono::seconds(3));
-    flowController_->updatePropertyValue("kamikaze", minifi::processors::KamikazeProcessor::ThrowInOnSchedule.getName(), "false");
-    std::this_thread::sleep_for(std::chrono::seconds(3));
-  }
-
   void cleanup() override {}
 };
 
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp
index c968230..ba9d496 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -38,12 +38,11 @@
 #include "../unit/ProvenanceTestHelper.h"
 #include "io/StreamFactory.h"
 #include "../TestBase.h"
-
-void waitToVerifyProcessor() {
-  std::this_thread::sleep_for(std::chrono::seconds(2));
-}
+#include "utils/IntegrationTestUtils.h"
 
 int main(int argc, char **argv) {
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+
   std::string test_file_location;
   if (argc > 1) {
     test_file_location = argv[1];
@@ -80,11 +79,10 @@
 
   controller->load();
   controller->start();
-  waitToVerifyProcessor();
+
+  assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(std::chrono::seconds(2)), "Add processor SiteToSiteProvenanceReportingTask into process group MiNiFi Flow"));
 
   controller->waitUnload(60000);
-  std::string logs = LogTestController::getInstance().log_output.str();
-  assert(logs.find("Add processor SiteToSiteProvenanceReportingTask into process group MiNiFi Flow") != std::string::npos);
   LogTestController::getInstance().reset();
   rmdir("./content_repository");
   rmdir("/tmp/aljs39/");
diff --git a/libminifi/test/pcap-tests/PcapTest.cpp b/libminifi/test/pcap-tests/PcapTest.cpp
index eea0364..2cbaa35 100644
--- a/libminifi/test/pcap-tests/PcapTest.cpp
+++ b/libminifi/test/pcap-tests/PcapTest.cpp
@@ -43,6 +43,7 @@
 #include "core/state/ProcessorController.h"
 #include "../integration/IntegrationBase.h"
 #include "CapturePacket.h"
+#include "utils/IntegrationTestUtils.h"
 
 class PcapTestHarness : public IntegrationBase {
  public:
@@ -66,10 +67,13 @@
   }
 
   void runAssertions() {
-    assert(LogTestController::getInstance().contains("Starting capture") == true);
-    assert(LogTestController::getInstance().contains("Stopping capture") == true);
-    assert(LogTestController::getInstance().contains("Stopped device capture. clearing queues") == true);
-    assert(LogTestController::getInstance().contains("Accepting ") == true && LogTestController::getInstance().contains("because it matches .*") );
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+        "Starting capture",
+        "Stopping capture",
+        "Stopped device capture. clearing queues",
+        "Accepting ",
+        "because it matches .*"));
   }
 
   void updateProperties(std::shared_ptr<minifi::FlowController> fc) {
diff --git a/libminifi/test/sensors-tests/SensorTests.cpp b/libminifi/test/sensors-tests/SensorTests.cpp
index 40047f5..83fbc4d 100644
--- a/libminifi/test/sensors-tests/SensorTests.cpp
+++ b/libminifi/test/sensors-tests/SensorTests.cpp
@@ -42,6 +42,7 @@
 #include "core/ConfigurableComponent.h"
 #include "../integration/IntegrationBase.h"
 #include "GetEnvironmentalSensors.h"
+#include "utils/IntegrationTestUtils.h"
 
 class PcapTestHarness : public IntegrationBase {
  public:
@@ -64,7 +65,8 @@
   }
 
   void runAssertions() {
-    assert(LogTestController::getInstance().contains("Initializing EnvironmentalSensors") == true);
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_), "Initializing EnvironmentalSensors"));
   }
 
   void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
diff --git a/libminifi/test/unit/BackTraceTests.cpp b/libminifi/test/unit/BackTraceTests.cpp
index 37e8aaa..19ccedf 100644
--- a/libminifi/test/unit/BackTraceTests.cpp
+++ b/libminifi/test/unit/BackTraceTests.cpp
@@ -75,7 +75,7 @@
 std::atomic<int> counter;
 
 int counterFunction() {
-  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+  std::this_thread::sleep_for(std::chrono::milliseconds(150));
   return ++counter;
 }
 
@@ -83,7 +83,7 @@
   counter = 0;
   utils::ThreadPool<int> pool(4);
   pool.start();
-  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+  std::this_thread::sleep_for(std::chrono::milliseconds(150));
   for (int i = 0; i < 3; i++) {
     std::function<int()> f_ex = counterFunction;
     std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(5));
diff --git a/libminifi/test/unit/MinifiConcurrentQueueTests.cpp b/libminifi/test/unit/MinifiConcurrentQueueTests.cpp
index 76b7642..1b0dc0f 100644
--- a/libminifi/test/unit/MinifiConcurrentQueueTests.cpp
+++ b/libminifi/test/unit/MinifiConcurrentQueueTests.cpp
@@ -26,25 +26,10 @@
 #include "../TestBase.h"
 #include "utils/MinifiConcurrentQueue.h"
 #include "utils/StringUtils.h"
+#include "utils/IntegrationTestUtils.h"
 
 namespace utils = org::apache::nifi::minifi::utils;
 
-namespace {
-
-  template<typename Function, typename Rep, typename Period>
-  bool becomesTrueWithinTimeout(const Function &condition, std::chrono::duration<Rep, Period> timeout) {
-    auto start_time = std::chrono::steady_clock::now();
-    while (std::chrono::steady_clock::now() < start_time + timeout) {
-      if (condition()) {
-        return true;
-      }
-      std::this_thread::sleep_for(std::chrono::milliseconds{1});
-    }
-    return false;
-  }
-
-}  // namespace
-
 namespace MinifiConcurrentQueueTestProducersConsumers {
 
   // Producers
@@ -75,7 +60,7 @@
 
   std::thread getSimpleTryDequeConsumerThread(utils::ConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
     return std::thread([&queue, &results] {
-      constexpr std::size_t max_read_attempts = 1000;
+      constexpr std::size_t max_read_attempts = 300;
       for (std::size_t attempt_num = 0; results.size() < 3 && attempt_num < max_read_attempts; ++attempt_num) {
         std::string s;
         if (queue.tryDequeue(s)) {
@@ -89,7 +74,7 @@
 
   std::thread getSimpleConsumeConsumerThread(utils::ConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
     return std::thread([&queue, &results] {
-      constexpr std::size_t max_read_attempts = 1000;
+      constexpr std::size_t max_read_attempts = 300;
       for (std::size_t attempt_num = 0; results.size() < 3 && attempt_num < max_read_attempts; ++attempt_num) {
         if (!queue.consume([&results] (const std::string& s) { results.push_back(s); })) {
           std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -146,7 +131,7 @@
 
   std::thread getDequeueWaitForConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
     return std::thread([&queue, &results] {
-      constexpr std::size_t max_read_attempts = 1000;
+      constexpr std::size_t max_read_attempts = 300;
       for (std::size_t attempt_num = 0; results.size() < 3 && attempt_num < max_read_attempts; ++attempt_num) {
         std::string s;
         if (queue.dequeueWaitFor(s, std::chrono::milliseconds(1))) {
@@ -158,7 +143,7 @@
 
   std::thread getDequeueWaitUntilConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
     return std::thread([&queue, &results] {
-      constexpr std::size_t max_read_attempts = 1000;
+      constexpr std::size_t max_read_attempts = 300;
       for (std::size_t attempt_num = 0; results.size() < 3 && attempt_num < max_read_attempts; ++attempt_num) {
         std::string s;
         const std::chrono::system_clock::time_point timeout_point = std::chrono::system_clock::now() + std::chrono::milliseconds(1);
@@ -171,7 +156,7 @@
 
   std::thread getConsumeWaitForConsumerThread(utils::ConditionConcurrentQueue<std::string>& queue, std::vector<std::string>& results) {
     return std::thread([&queue, &results]() {
-      constexpr std::size_t max_read_attempts = 1000;
+      constexpr std::size_t max_read_attempts = 300;
       for (std::size_t attempt_num = 0; results.size() < 3 && attempt_num < max_read_attempts; ++attempt_num) {
         queue.consumeWaitFor([&results] (const std::string& s) { results.push_back(s); }, std::chrono::milliseconds(1));
       }
@@ -311,7 +296,7 @@
   producer.join();
 
   auto queue_is_empty = [&queue]() { return queue.empty(); };
-  REQUIRE(becomesTrueWithinTimeout(queue_is_empty, std::chrono::seconds{1}));
+  REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds(1), queue_is_empty));
 
   queue.stop();
   consumer.join();
@@ -330,7 +315,7 @@
   std::thread producer { getSimpleProducerThread(queue) };
 
   auto we_have_all_results = [&results_size]() { return results_size >= 3; };
-  REQUIRE(becomesTrueWithinTimeout(we_have_all_results, std::chrono::seconds{1}));
+  REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds(1), we_have_all_results));
 
   queue.stop();
   producer.join();
@@ -366,7 +351,7 @@
   std::mt19937 rng(dev());
   std::uniform_int_distribution<std::mt19937::result_type> dist(1, std::numeric_limits<int>::max());
 
-  std::vector<int> source(1000000);
+  std::vector<int> source(50000);
   std::vector<int> target;
 
   generate(source.begin(), source.end(), [&rng, &dist](){ return dist(rng); });
@@ -380,7 +365,7 @@
 
   std::thread relay([&queue, &cqueue]() {
     size_t cnt = 0;
-    while (cnt < 1000000) {
+    while (cnt < 50000) {
       int i;
       if (queue.tryDequeue(i)) {
         cnt++;
@@ -400,7 +385,7 @@
   relay.join();
 
   auto queue_is_empty = [&cqueue]() { return cqueue.empty(); };
-  REQUIRE(becomesTrueWithinTimeout(queue_is_empty, std::chrono::seconds{1}));
+  REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::seconds(1), queue_is_empty));
 
   cqueue.stop();
   consumer.join();