DISPATCH-2200 Add basic TCP adaptor microbenchmark w/ 2 routers (#1303)
* reset the dispatch pointer; just for cleanliness, not actually necessary to do
* basically working tests; except one was getting stuck (before I rebased the PR)
* move test setup around; introduce helper to run router in a subprocess
* delete the two routers in a thread benchmark case; that cannot work due to dispatch's use of global variables
* call qd_entity_cache_free_entries; this should not be forgotten
diff --git a/src/python_embedded.c b/src/python_embedded.c
index 876b2f3..f508786 100644
--- a/src/python_embedded.c
+++ b/src/python_embedded.c
@@ -69,6 +69,7 @@
dispatch_module = 0;
PyGC_Collect();
Py_Finalize();
+ dispatch = 0;
}
diff --git a/tests/c_benchmarks/bm_tcp_adapter.cpp b/tests/c_benchmarks/bm_tcp_adapter.cpp
index 06a4aae..77452ed 100644
--- a/tests/c_benchmarks/bm_tcp_adapter.cpp
+++ b/tests/c_benchmarks/bm_tcp_adapter.cpp
@@ -25,7 +25,12 @@
#include "echo_server.hpp"
#include <benchmark/benchmark.h>
+#include <linux/prctl.h>
+#include <sys/prctl.h>
+#include <sys/wait.h>
+#include <unistd.h>
+#include <csignal>
#include <iostream>
extern "C" {
@@ -39,6 +44,119 @@
void qd_error_initialize();
} // extern "C"
+static unsigned short findFreePort()
+{
+ TCPServerSocket serverSocket(0);
+ unsigned short port = serverSocket.getLocalPort();
+ return port;
+}
+
+static std::stringstream oneRouterTcpConfig(const unsigned short tcpConnectorPort, unsigned short tcpListenerPort)
+{
+ std::stringstream router_config;
+ router_config << R"END(
+router {
+ mode: standalone
+ id : QDR
+}
+
+listener {
+ port : 0
+}
+
+tcpListener {
+ host : 0.0.0.0
+ port : )END" << tcpListenerPort
+ << R"END(
+ address : ES
+ siteId : siteId
+}
+
+tcpConnector {
+ host : 127.0.0.1
+ port : )END" << tcpConnectorPort
+ << R"END(
+ address : ES
+ siteId : siteId
+}
+
+log {
+ module: DEFAULT
+ enable: warning+
+})END";
+
+ return router_config;
+}
+
+static std::stringstream multiRouterTcpConfig(std::string routerId, const std::vector<unsigned short> listenerPorts,
+ const std::vector<unsigned short> connectorPorts,
+ unsigned short tcpConnectorPort, unsigned short tcpListenerPort)
+{
+ std::stringstream router_config;
+ router_config << R"END(
+router {
+ mode: interior
+ id : )END" << routerId
+ << R"END(
+})END";
+
+ for (auto connectorPort : connectorPorts) {
+ router_config << R"END(
+connector {
+ host: 127.0.0.1
+ port : )END" << connectorPort
+ << R"END(
+ role: inter-router
+})END";
+ }
+
+ for (auto listenerPort : listenerPorts) {
+ router_config << R"END(
+listener {
+ host: 0.0.0.0
+ port : )END" << listenerPort
+ << R"END(
+ role: inter-router
+ })END";
+ }
+
+ if (tcpListenerPort != 0) {
+ router_config << R"END(
+tcpListener {
+ host : 0.0.0.0
+ port : )END" << tcpListenerPort
+ << R"END(
+ address : ES
+ siteId : siteId
+})END";
+ }
+ if (tcpConnectorPort != 0) {
+ router_config << R"END(
+tcpConnector {
+ host : 127.0.0.1
+ port : )END" << tcpConnectorPort
+ << R"END(
+ address : ES
+ siteId : siteId
+})END";
+ }
+
+ router_config << R"END(
+log {
+ module: DEFAULT
+ enable: warn+
+})END";
+
+ return router_config;
+}
+
+static void writeRouterConfig(const std::string &configName, const std::stringstream &router_config)
+{
+ std::fstream f(configName, std::ios::out);
+ f << router_config.str();
+ f.close();
+}
+
static TCPSocket try_to_connect(const std::string &servAddress, int echoServPort)
{
auto then = std::chrono::steady_clock::now();
@@ -103,3 +221,143 @@
}
BENCHMARK(BM_TCPEchoServerLatencyWithoutQDR)->Unit(benchmark::kMillisecond);
+
+class DispatchRouterThreadTCPLatencyTest
+{
+ QDR mQdr{};
+ std::thread mT;
+
+ public:
+ DispatchRouterThreadTCPLatencyTest(const std::string configName)
+ {
+ Latch mx;
+
+ mT = std::thread([&mx, this, &configName]() {
+ mQdr.initialize(configName);
+ mQdr.wait();
+
+ mx.notify();
+ mQdr.run();
+
+ mQdr.deinitialize(false);
+ });
+
+ mx.wait();
+ }
+
+ ~DispatchRouterThreadTCPLatencyTest()
+ {
+ mQdr.stop();
+ mT.join();
+ }
+};
+
+class DispatchRouterSubprocessTcpLatencyTest
+{
+ int pid;
+
+ public:
+ DispatchRouterSubprocessTcpLatencyTest(std::string configName)
+ {
+ pid = fork();
+ if (pid == 0) {
+ // https://stackoverflow.com/questions/10761197/prctlpr-set-pdeathsig-signal-is-called-on-parent-thread-exit-not-parent-proc
+ prctl(PR_SET_PDEATHSIG, SIGHUP);
+ QDR qdr{};
+ qdr.initialize(configName);
+ qdr.wait();
+
+ qdr.run(); // this never returns until signal is sent, and then process dies
+ }
+ }
+
+ ~DispatchRouterSubprocessTcpLatencyTest()
+ {
+ int ret = kill(pid, SIGTERM);
+ if (ret != 0) {
+ perror("Killing router");
+ }
+ int status;
+ ret = waitpid(pid, &status, 0);
+ if (ret != pid) {
+ perror("Waiting for child");
+ }
+ }
+};
+
+static void BM_TCPEchoServerLatency1QDRThread(benchmark::State &state)
+{
+ auto est = make_unique<EchoServerThread>();
+ unsigned short tcpConnectorPort = est->port();
+ unsigned short tcpListenerPort = findFreePort();
+
+ std::string configName = "BM_TCPEchoServerLatency1QDRThread";
+ std::stringstream router_config = oneRouterTcpConfig(tcpConnectorPort, tcpListenerPort);
+ writeRouterConfig(configName, router_config);
+
+ DispatchRouterThreadTCPLatencyTest drt{configName};
+
+ {
+ LatencyMeasure lm;
+ lm.latencyMeasureLoop(state, tcpListenerPort);
+ }
+ // kill echo server first
+ // when dispatch is stopped first, echo server then sometimes hangs on socket recv, and dispatch leaks more
+ // (suppressed leaks):
+ /*
+ 76: Assertion `leak_reports.length() == 0` failed in ../tests/c_benchmarks/../c_unittests/helpers.hpp line 136:
+ alloc.c: Items of type 'qd_buffer_t' remain allocated at shutdown: 3 (SUPPRESSED)
+ 76: alloc.c: Items of type 'qd_message_t' remain allocated at shutdown: 2 (SUPPRESSED)
+ 76: alloc.c: Items of type 'qd_message_content_t' remain allocated at shutdown: 2 (SUPPRESSED)
+ 76: alloc.c: Items of type 'qdr_delivery_t' remain allocated at shutdown: 2 (SUPPRESSED)
+ */
+ est.reset();
+}
+
+BENCHMARK(BM_TCPEchoServerLatency1QDRThread)->Unit(benchmark::kMillisecond);
+
+static void BM_TCPEchoServerLatency1QDRSubprocess(benchmark::State &state)
+{
+ EchoServerThread est;
+ unsigned short tcpConnectorPort = est.port();
+ unsigned short tcpListenerPort = findFreePort();
+
+ std::string configName = "BM_TCPEchoServerLatency1QDRSubprocess.conf";
+ std::stringstream router_config = oneRouterTcpConfig(tcpConnectorPort, tcpListenerPort);
+ writeRouterConfig(configName, router_config);
+
+ DispatchRouterSubprocessTcpLatencyTest drt(configName);
+
+ {
+ LatencyMeasure lm;
+ lm.latencyMeasureLoop(state, tcpListenerPort);
+ }
+}
+
+BENCHMARK(BM_TCPEchoServerLatency1QDRSubprocess)->Unit(benchmark::kMillisecond);
+
+static void BM_TCPEchoServerLatency2QDRSubprocess(benchmark::State &state)
+{
+ EchoServerThread est;
+
+ unsigned short listener_2 = findFreePort();
+ unsigned short tcpListener_2 = findFreePort();
+
+ std::string configName_1 = "BM_TCPEchoServerLatency2QDRSubprocess_1.conf";
+ std::stringstream router_config_1 = multiRouterTcpConfig("QDRL1", {}, {listener_2}, est.port(), 0);
+ writeRouterConfig(configName_1, router_config_1);
+
+ std::string configName_2 = "BM_TCPEchoServerLatency2QDRSubprocess_2.conf";
+ std::stringstream router_config_2 = multiRouterTcpConfig("QDRL2", {listener_2}, {}, 0, tcpListener_2);
+ writeRouterConfig(configName_2, router_config_2);
+
+ DispatchRouterSubprocessTcpLatencyTest qdr1{configName_1};
+ DispatchRouterSubprocessTcpLatencyTest qdr2{configName_2};
+
+ {
+ LatencyMeasure lm;
+ lm.latencyMeasureLoop(state, tcpListener_2);
+ }
+}
+
+BENCHMARK(BM_TCPEchoServerLatency2QDRSubprocess)->Unit(benchmark::kMillisecond);
diff --git a/tests/c_benchmarks/echo_server.hpp b/tests/c_benchmarks/echo_server.hpp
index 3b9c3a6..50ff59c 100644
--- a/tests/c_benchmarks/echo_server.hpp
+++ b/tests/c_benchmarks/echo_server.hpp
@@ -108,7 +108,7 @@
u.join();
}
- unsigned short port()
+ unsigned short port() const
{
return echoServerPort;
}
diff --git a/tests/c_unittests/helpers.hpp b/tests/c_unittests/helpers.hpp
index 6517b79..899a00c 100644
--- a/tests/c_unittests/helpers.hpp
+++ b/tests/c_unittests/helpers.hpp
@@ -73,6 +73,13 @@
template <class T>
using remove_const_t = typename std::remove_const<T>::type;
+// https://stackoverflow.com/questions/17902405/how-to-implement-make-unique-function-in-c11
+template<typename T, typename... Args>
+std::unique_ptr<T> make_unique(Args&&... args)
+{
+ return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
+}
+
// https://stackoverflow.com/questions/27440953/stdunique-ptr-for-c-functions-that-need-free
struct free_deleter {
template <typename T>
@@ -278,6 +285,7 @@
{
qd_log_finalize();
qd_alloc_finalize();
+ qd_entity_cache_free_entries();
}
};