blob: 77452ed7fcc67cf6ba3a6b78cdfb6d8c0229087e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "../c_unittests/helpers.hpp"
#include "SocketException.hpp"
#include "TCPSocket.hpp"
#include "echo_server.hpp"
#include <benchmark/benchmark.h>
#include <linux/prctl.h>
#include <sys/prctl.h>
#include <sys/wait.h>
#include <unistd.h>
#include <csignal>
#include <iostream>
extern "C" {
#include "entity_cache.h"
#include "log_private.h"
#include "parse_tree.h"
#include "qpid/dispatch.h"
// declarations that don't have .h file
void qd_error_initialize();
} // extern "C"
static unsigned short findFreePort()
{
TCPServerSocket serverSocket(0);
unsigned short port = serverSocket.getLocalPort();
return port;
}
static std::stringstream oneRouterTcpConfig(const unsigned short tcpConnectorPort, unsigned short tcpListenerPort)
{
std::stringstream router_config;
router_config << R"END(
router {
mode: standalone
id : QDR
}
listener {
port : 0
}
tcpListener {
host : 0.0.0.0
port : )END" << tcpListenerPort
<< R"END(
address : ES
siteId : siteId
}
tcpConnector {
host : 127.0.0.1
port : )END" << tcpConnectorPort
<< R"END(
address : ES
siteId : siteId
}
log {
module: DEFAULT
enable: warning+
})END";
return router_config;
}
static std::stringstream multiRouterTcpConfig(std::string routerId, const std::vector<unsigned short> listenerPorts,
const std::vector<unsigned short> connectorPorts,
unsigned short tcpConnectorPort, unsigned short tcpListenerPort)
{
std::stringstream router_config;
router_config << R"END(
router {
mode: interior
id : )END" << routerId
<< R"END(
})END";
for (auto connectorPort : connectorPorts) {
router_config << R"END(
connector {
host: 127.0.0.1
port : )END" << connectorPort
<< R"END(
role: inter-router
})END";
}
for (auto listenerPort : listenerPorts) {
router_config << R"END(
listener {
host: 0.0.0.0
port : )END" << listenerPort
<< R"END(
role: inter-router
})END";
}
if (tcpListenerPort != 0) {
router_config << R"END(
tcpListener {
host : 0.0.0.0
port : )END" << tcpListenerPort
<< R"END(
address : ES
siteId : siteId
})END";
}
if (tcpConnectorPort != 0) {
router_config << R"END(
tcpConnector {
host : 127.0.0.1
port : )END" << tcpConnectorPort
<< R"END(
address : ES
siteId : siteId
})END";
}
router_config << R"END(
log {
module: DEFAULT
enable: warn+
})END";
return router_config;
}
static void writeRouterConfig(const std::string &configName, const std::stringstream &router_config)
{
std::fstream f(configName, std::ios::out);
f << router_config.str();
f.close();
}
static TCPSocket try_to_connect(const std::string &servAddress, int echoServPort)
{
auto then = std::chrono::steady_clock::now();
while (std::chrono::steady_clock::now() - then < std::chrono::seconds(3)) {
try {
TCPSocket sock(servAddress, echoServPort);
return sock;
} catch (SocketException &e) {
}
}
throw std::runtime_error("Failed to connect in time");
}
class LatencyMeasure
{
static const int RCVBUFSIZE = 32;
char echoBuffer[RCVBUFSIZE + 1]; // '\0'
std::string servAddress = "127.0.0.1";
std::string echoString = "echoString";
int echoStringLen = echoString.length();
public:
inline void latencyMeasureLoop(benchmark::State &state, unsigned short echoServerPort)
{
{
TCPSocket sock = try_to_connect(servAddress, echoServerPort);
latencyMeasureSendReceive(state, sock); // run once outside benchmark to clean the pipes first
for (auto _ : state) {
latencyMeasureSendReceive(state, sock);
}
}
}
inline void latencyMeasureSendReceive(benchmark::State &state, TCPSocket &sock)
{
sock.send(echoString.c_str(), echoStringLen);
int totalBytesReceived = 0;
while (totalBytesReceived < echoStringLen) {
int bytesReceived = sock.recv(echoBuffer, RCVBUFSIZE);
if (bytesReceived <= 0) {
state.SkipWithError("unable to read from socket");
}
totalBytesReceived += bytesReceived;
echoBuffer[bytesReceived] = '\0';
}
}
};
/// Measures latency between a TCP send and a receive.
/// There is only one request in flight at all times, so this is the
/// lowest conceivable latency at the most ideal condition
/// In addition, all sends are of the same (tiny) size
static void BM_TCPEchoServerLatencyWithoutQDR(benchmark::State &state)
{
EchoServerThread est;
LatencyMeasure lm;
lm.latencyMeasureLoop(state, est.port());
}
BENCHMARK(BM_TCPEchoServerLatencyWithoutQDR)->Unit(benchmark::kMillisecond);
class DispatchRouterThreadTCPLatencyTest
{
QDR mQdr{};
std::thread mT;
public:
DispatchRouterThreadTCPLatencyTest(const std::string configName)
{
Latch mx;
mT = std::thread([&mx, this, &configName]() {
mQdr.initialize(configName);
mQdr.wait();
mx.notify();
mQdr.run();
mQdr.deinitialize(false);
});
mx.wait();
}
~DispatchRouterThreadTCPLatencyTest()
{
mQdr.stop();
mT.join();
}
};
class DispatchRouterSubprocessTcpLatencyTest
{
int pid;
public:
DispatchRouterSubprocessTcpLatencyTest(std::string configName)
{
pid = fork();
if (pid == 0) {
// https://stackoverflow.com/questions/10761197/prctlpr-set-pdeathsig-signal-is-called-on-parent-thread-exit-not-parent-proc
prctl(PR_SET_PDEATHSIG, SIGHUP);
QDR qdr{};
qdr.initialize(configName);
qdr.wait();
qdr.run(); // this never returns until signal is sent, and then process dies
}
}
~DispatchRouterSubprocessTcpLatencyTest()
{
int ret = kill(pid, SIGTERM);
if (ret != 0) {
perror("Killing router");
}
int status;
ret = waitpid(pid, &status, 0);
if (ret != pid) {
perror("Waiting for child");
}
}
};
static void BM_TCPEchoServerLatency1QDRThread(benchmark::State &state)
{
auto est = make_unique<EchoServerThread>();
unsigned short tcpConnectorPort = est->port();
unsigned short tcpListenerPort = findFreePort();
std::string configName = "BM_TCPEchoServerLatency1QDRThread";
std::stringstream router_config = oneRouterTcpConfig(tcpConnectorPort, tcpListenerPort);
writeRouterConfig(configName, router_config);
DispatchRouterThreadTCPLatencyTest drt{configName};
{
LatencyMeasure lm;
lm.latencyMeasureLoop(state, tcpListenerPort);
}
// kill echo server first
// when dispatch is stopped first, echo server then sometimes hangs on socket recv, and dispatch leaks more
// (suppressed leaks):
/*
76: Assertion `leak_reports.length() == 0` failed in ../tests/c_benchmarks/../c_unittests/helpers.hpp line 136:
alloc.c: Items of type 'qd_buffer_t' remain allocated at shutdown: 3 (SUPPRESSED)
76: alloc.c: Items of type 'qd_message_t' remain allocated at shutdown: 2 (SUPPRESSED)
76: alloc.c: Items of type 'qd_message_content_t' remain allocated at shutdown: 2 (SUPPRESSED)
76: alloc.c: Items of type 'qdr_delivery_t' remain allocated at shutdown: 2 (SUPPRESSED)
*/
est.reset();
}
BENCHMARK(BM_TCPEchoServerLatency1QDRThread)->Unit(benchmark::kMillisecond);
static void BM_TCPEchoServerLatency1QDRSubprocess(benchmark::State &state)
{
EchoServerThread est;
unsigned short tcpConnectorPort = est.port();
unsigned short tcpListenerPort = findFreePort();
std::string configName = "BM_TCPEchoServerLatency1QDRSubprocess.conf";
std::stringstream router_config = oneRouterTcpConfig(tcpConnectorPort, tcpListenerPort);
writeRouterConfig(configName, router_config);
DispatchRouterSubprocessTcpLatencyTest drt(configName);
{
LatencyMeasure lm;
lm.latencyMeasureLoop(state, tcpListenerPort);
}
}
BENCHMARK(BM_TCPEchoServerLatency1QDRSubprocess)->Unit(benchmark::kMillisecond);
static void BM_TCPEchoServerLatency2QDRSubprocess(benchmark::State &state)
{
EchoServerThread est;
unsigned short listener_2 = findFreePort();
unsigned short tcpListener_2 = findFreePort();
std::string configName_1 = "BM_TCPEchoServerLatency2QDRSubprocess_1.conf";
std::stringstream router_config_1 = multiRouterTcpConfig("QDRL1", {}, {listener_2}, est.port(), 0);
writeRouterConfig(configName_1, router_config_1);
std::string configName_2 = "BM_TCPEchoServerLatency2QDRSubprocess_2.conf";
std::stringstream router_config_2 = multiRouterTcpConfig("QDRL2", {listener_2}, {}, 0, tcpListener_2);
writeRouterConfig(configName_2, router_config_2);
DispatchRouterSubprocessTcpLatencyTest qdr1{configName_1};
DispatchRouterSubprocessTcpLatencyTest qdr2{configName_2};
{
LatencyMeasure lm;
lm.latencyMeasureLoop(state, tcpListener_2);
}
}
BENCHMARK(BM_TCPEchoServerLatency2QDRSubprocess)->Unit(benchmark::kMillisecond);