blob: 899a00c2da3f5a8633090e664800aadd5f9b349c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef QPID_DISPATCH_HELPERS_HPP
#define QPID_DISPATCH_HELPERS_HPP
#include <cassert>
#include <condition_variable>
#include <fstream>
#include <iostream>
#include <memory>
#include <mutex>
#include <sstream>
// assertions without stack traces when running outside doctest
#ifndef QDR_DOCTEST
// https://stackoverflow.com/questions/3767869/adding-message-to-assert
#define REQUIRE(condition) assert(condition)
#define REQUIRE_MESSAGE(condition, message) \
do { \
if (! (condition)) { \
std::cerr << "Assertion `" #condition "` failed in " << __FILE__ \
<< " line " << __LINE__ << ": " << (message) << std::endl; \
std::terminate(); \
} \
} while (false)
#define CHECK_MESSAGE(condition, message) \
do { \
if (! (condition)) { \
std::cerr << "Assertion `" #condition "` failed in " << __FILE__ \
<< " line " << __LINE__ << ": " << (message) << std::endl; \
} \
} while (false)
#endif // QDR_DOCTEST
extern "C" {
#include <dispatch_private.h>
#include <qpid/dispatch/dispatch.h>
#include <router_core/router_core_private.h>
// declarations that don't have .h file
void qd_router_setup_late(qd_dispatch_t *qd);
}
// low-level router initialization
extern "C" {
#include "entity_cache.h"
#include "log_private.h"
#include "qpid/dispatch.h"
// declarations that don't have .h file
void qd_error_initialize();
}
// backport of C++14 feature
template <class T>
using remove_const_t = typename std::remove_const<T>::type;
// https://stackoverflow.com/questions/17902405/how-to-implement-make-unique-function-in-c11
template<typename T, typename... Args>
std::unique_ptr<T> make_unique(Args&&... args)
{
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}
// https://stackoverflow.com/questions/27440953/stdunique-ptr-for-c-functions-that-need-free
struct free_deleter {
template <typename T>
void operator()(T *p) const
{
std::free(const_cast<remove_const_t<T> *>(p));
}
};
template <typename T>
using unique_C_ptr = std::unique_ptr<T, free_deleter>;
static_assert(sizeof(char *) == sizeof(unique_C_ptr<char>), ""); // ensure no overhead
// https://stackoverflow.com/questions/65290961/can-i-succintly-declare-stdunique-ptr-with-custom-deleter
template <typename T, typename Deleter>
std::unique_ptr<T, Deleter> qd_make_unique(T *raw, Deleter deleter)
{
return std::unique_ptr<T, Deleter>(raw, deleter);
}
/* the above allows replacing the first declaration with the second, shorter, one:
std::unique_ptr<qdr_link_t, decltype(&free_qdr_link_t)> link{new_qdr_link_t(), free_qdr_link_t};
auto link = qd_make_unique(new_qdr_link_t(), free_qdr_link_t);
*/
/// Redirects leak reports to a file, and fails the test if
/// anything is reported (even suppressed leaks).
class WithNoMemoryLeaks
{
bool fail;
public:
unique_C_ptr<char> path_ptr{strdup("unittests_memory_debug_logs_XXXXXX")};
explicit WithNoMemoryLeaks(bool fail = true)
{
this->fail = fail;
#if QD_MEMORY_DEBUG
int fd = mkstemp(path_ptr.get());
(void) fd; // unused
REQUIRE(fd != -1);
qd_alloc_debug_dump(path_ptr.get());
#endif // QD_MEMORY_DEBUG
}
~WithNoMemoryLeaks()
{
#if QD_MEMORY_DEBUG
std::ifstream f(path_ptr.get());
std::stringstream buffer;
buffer << f.rdbuf();
std::string leak_reports = buffer.str();
if (fail) {
REQUIRE_MESSAGE(leak_reports.length() == 0, leak_reports);
} else {
CHECK_MESSAGE(leak_reports.length() == 0, leak_reports);
}
qd_alloc_debug_dump(nullptr);
f.close();
#endif // QD_MEMORY_DEBUG
}
};
/// Submits an action to the router's action list. When action runs, we know router finished all previous actions.
///
/// This can be used to detect the router finished starting (i.e., performed all previously scheduled actions).
class RouterStartupLatch
{
public:
std::mutex mut;
std::condition_variable cv;
bool done = false;
void wait_for_qdr(qd_dispatch_t *qd)
{
qdr_action_handler_t action_handler = [](qdr_core_t *core, qdr_action_t *action, bool discard) {
auto that = static_cast<RouterStartupLatch *>(action->args.general.context_1);
std::lock_guard<std::mutex> lock(that->mut);
that->done = true;
that->cv.notify_one();
};
qdr_action_t *action = qdr_action(action_handler, "RouterStartupLatch action");
action->args.general.context_1 = this;
qdr_action_enqueue(qd->router->router_core, action);
std::unique_lock<std::mutex> lock(mut);
cv.wait(lock, [this] { return done; }); // wait for action_handler to notify us
}
};
inline std::string get_env(std::string const &key)
{
char *val = std::getenv(key.c_str());
return val == nullptr ? std::string("") : std::string(val);
}
/// Manages the router lifecycle
/**
Only one QDR instance may exist at the same time,
and each time it has to be started in a fresh std::thread.
The router uses global state, especially in relation to the Python module. That
is why things break in weird ways if there is more than one running. It is
possible to fork() and run multiple routers that way, but that gives no advantage
over subprocess in Python system-tests, it only complicates things.
It is not possible to initialize the router multiple times in the same thread, due to
alloc pools declared as `extern __thread qd_alloc_pool_t *`. These will have wrong values
the second time around, and there is no good way to hunt them all down and NULL them.
*/
class QDR
{
// protects global variables around router startup and pool leak dumping
static std::mutex startup_shutdown_lock;
public:
qd_dispatch_t *qd;
/// prepare the smallest amount of things that qd_dispatch_free needs to be present
void initialize(const std::string &config_path = "")
{
const std::lock_guard<std::mutex> lock(QDR::startup_shutdown_lock);
qd = qd_dispatch(nullptr, false);
REQUIRE(qd != nullptr);
// qd can be configured at this point, e.g. qd->thread_count
if (!config_path.empty()) {
// call qd_dispatch_load_config to get management agent initialized
// const std::string &source_dir = get_env("CMAKE_CURRENT_SOURCE_DIR");
REQUIRE(qd_dispatch_validate_config(config_path.c_str()) == QD_ERROR_NONE);
REQUIRE(qd_dispatch_load_config(qd, config_path.c_str()) == QD_ERROR_NONE);
} else {
// this is the abbreviated setup load_config() calls from Python, this way we can sometimes skip loading a
// config file
REQUIRE(qd_dispatch_prepare(qd) == QD_ERROR_NONE);
qd_router_setup_late(qd); // sets up e.g. qd->router->router_core
}
};
/// cleaning up too early after init will lead to leaks and other
/// unpleasantries (I observed some invalid pointer accesses)
void wait() const
{
// give the router core thread an action; when that executes, we're done starting up
RouterStartupLatch{}.wait_for_qdr(qd);
}
/// Runs the router in the current thread (+ any new threads router decides to spawn).
///
/// This method blocks until stop() is called.
void run() const
{
qd_server_run(qd);
}
void stop() const
{
qd_server_stop(qd);
}
/// Frees the router and optionally checks for leaks.
void deinitialize(bool check_leaks = true) const
{
const std::lock_guard<std::mutex> lock(QDR::startup_shutdown_lock);
const WithNoMemoryLeaks wnml{check_leaks};
qd_dispatch_free(qd);
// cache is a global var and sanitizers are ok with not freeing what's reachable through it
// however, redeclaring it in a second router without freeing what then becomes unreachable creates leak
qd_entity_cache_free_entries();
};
};
/// Synchronizes two threads. One waits at the latch, the other releases the latch.
class Latch
{
std::mutex mut;
std::condition_variable cv;
bool opened = false;
public:
void notify()
{
std::lock_guard<std::mutex> lock(mut);
opened = true;
cv.notify_all();
}
void wait()
{
std::unique_lock<std::mutex> lock(mut);
cv.wait(lock, [this] { return opened; });
}
};
class QDRMinimalEnv
{
public:
QDRMinimalEnv()
{
qd_alloc_initialize();
qd_log_initialize();
qd_error_initialize();
}
~QDRMinimalEnv()
{
qd_log_finalize();
qd_alloc_finalize();
qd_entity_cache_free_entries();
}
};
#endif // QPID_DISPATCH_HELPERS_HPP