blob: 177f3eaf8f759974b918aefe942e4656409ab87f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef USE_SMARTHEAP
#include <smrtheap.h>
#endif
#include <string>
#include <iostream>
#include <iomanip>
#include <list>
#include <map>
#include <boost/process.hpp>
#include <boost/program_options.hpp>
#include <boost/interprocess/mapped_region.hpp>
#ifdef _WIN32
#include <boost/interprocess/windows_shared_memory.hpp>
#else
#include <boost/interprocess/shared_memory_object.hpp>
#endif
#include "fwklib/FwkException.hpp"
#define __DUNIT_NO_MAIN__
#include "fw_dunit.hpp"
#include "Utils.hpp"
namespace bp = boost::process;
namespace bip = boost::interprocess;
namespace bpo = boost::program_options;
static std::string g_programName;
static uint32_t g_coordinatorPid = 0;
ClientCleanup gClientCleanup;
namespace dunit {
void setupCRTOutput() {
#ifdef _WIN32
#ifdef DEBUG
int reportMode = _CRTDBG_MODE_FILE | _CRTDBG_MODE_WNDW;
_CrtSetReportMode(_CRT_ASSERT, reportMode);
_CrtSetReportFile(_CRT_ASSERT, _CRTDBG_FILE_STDERR);
_CrtSetReportMode(_CRT_ERROR, reportMode);
_CrtSetReportFile(_CRT_ERROR, _CRTDBG_FILE_STDERR);
_CrtSetReportMode(_CRT_WARN, reportMode);
_CrtSetReportFile(_CRT_WARN, _CRTDBG_FILE_STDERR);
SetErrorMode(SEM_FAILCRITICALERRORS);
#endif
#endif
}
// some common values..
#define WORKER_STATE_READY 1
#define WORKER_STATE_DONE 2
#define WORKER_STATE_TASK_ACTIVE 3
#define WORKER_STATE_TASK_COMPLETE 4
#define WORKER_STATE_SCHEDULED 5
void log(std::string s, int lineno, const char *filename);
/** uniquely represent each different worker. */
class WorkerId {
private:
uint32_t m_id;
static const char *m_idNames[];
public:
explicit WorkerId(uint32_t id) { m_id = id; }
int getId() const { return m_id; }
const char *getIdName() { return m_idNames[m_id]; }
/** return the system id for this process */
int getSystem() { return 1; }
/** return the process id for this system. */
int getProcOnSys() { return ((m_id % 2) == 0) ? 2 : 1; }
};
const char *WorkerId::m_idNames[] = {"none", "s1p1", "s1p2", "s2p1", "s2p2"};
/** method for letting Task discover its name through RTTI. */
std::string Task::typeName() { return std::string(typeid(*this).name()); }
typedef std::list<Task *> TaskList;
/** contains a queue of Task* for each WorkerId. */
class TaskQueues {
private:
std::map<int, TaskList> m_qmap;
std::list<int> m_schedule;
TaskQueues() : m_qmap(), m_schedule() {}
void registerTask(WorkerId sId, Task *task) {
m_qmap[sId.getId()].push_back(task);
m_schedule.push_back(sId.getId());
}
Task *nextTask(WorkerId &sId) {
TaskList *tasks = &(m_qmap[sId.getId()]);
if (tasks->empty()) {
return nullptr;
}
Task *task = tasks->front();
if (task != nullptr) {
LOG(std::string("receieved task: ") + task->m_taskName);
tasks->pop_front();
}
return task;
}
int nextWorkerId() {
if (m_schedule.empty()) {
return 0;
}
int sId = m_schedule.front();
LOGCOORDINATOR(std::string("Next worker id id : ") + std::to_string(sId));
m_schedule.pop_front();
return sId;
}
static TaskQueues *taskQueues;
public:
static void addTask(WorkerId sId, Task *task) {
if (taskQueues == nullptr) {
taskQueues = new TaskQueues();
}
taskQueues->registerTask(sId, task);
}
static int getWorkerId() {
ASSERT(taskQueues != nullptr, "failure to initialize fw_dunit module.");
return taskQueues->nextWorkerId();
}
static Task *getTask(WorkerId sId) {
ASSERT(taskQueues != nullptr, "failure to initialize fw_dunit module.");
return taskQueues->nextTask(sId);
}
};
TaskQueues *TaskQueues::taskQueues = nullptr;
/** register task with worker. */
void Task::init(int sId) { init(sId, false); }
void Task::init(int sId, bool isHeapAllocated) {
m_isHeapAllocated = isHeapAllocated;
m_id = sId;
m_taskName = this->typeName();
TaskQueues::addTask(WorkerId(sId), this);
}
class TestState {
public:
static const auto WORKER_COUNT = 4U;
void reset();
void setWorkerTimeout(int id, int seconds);
int getWorkerTimeout(int id) const;
void setWorkerState(int id, uint8_t state);
int getWorkerState(int id) const;
void setNextWorker(int id);
int getNextWorker();
void fail();
bool failed() const;
void terminate();
bool terminated() const;
private:
bool failure_;
bool terminate_;
int next_worker_;
int worker_timeout_[WORKER_COUNT];
uint8_t worker_state_[WORKER_COUNT];
};
/** main framework entry */
class Dunit {
private:
static const auto MANAGED_STATE_SIZE = 1UL << 17UL;
static Dunit *singleton;
bool coordinator_;
bip::mapped_region globals_region_;
#ifdef _WIN32
bip::windows_shared_memory globals_shm_;
#else
bip::shared_memory_object globals_shm_;
#endif
bip::managed_shared_memory managed_state_;
explicit Dunit(bool coordinator) : coordinator_(coordinator) {
if (coordinator) {
removeStates();
#ifdef _WIN32
globals_shm_ =
bip::windows_shared_memory{bip::create_only, getSharedName(),
bip::read_write, sizeof(TestState)};
#else
globals_shm_ = bip::shared_memory_object{
bip::create_only, getSharedName(), bip::read_write};
globals_shm_.truncate(sizeof(TestState));
#endif
managed_state_ = bip::managed_shared_memory{
bip::create_only, getManagedStateName(), MANAGED_STATE_SIZE};
} else {
using shared_memory =
#ifdef _WIN32
bip::windows_shared_memory;
#else
bip::shared_memory_object;
#endif
globals_shm_ =
shared_memory{bip::open_only, getSharedName(), bip::read_write};
managed_state_ =
bip::managed_shared_memory{bip::open_only, getManagedStateName()};
}
globals_region_ = bip::mapped_region{globals_shm_, bip::read_write};
if (coordinator) {
getState()->reset();
}
}
~Dunit() {
if (coordinator_) {
removeStates();
}
}
static void removeStates() {
bip::shared_memory_object::remove(getSharedName());
bip::shared_memory_object::remove(getManagedStateName());
}
static const char *getSharedName() {
static std::string name = std::string{std::getenv("TESTNAME")} + '.' +
std::to_string(g_coordinatorPid);
return name.c_str();
}
static const char *getManagedStateName() {
static std::string name = std::string{std::getenv("TESTNAME")} +
".managed." + std::to_string(g_coordinatorPid);
return name.c_str();
}
public:
/** call this once just inside main... */
static void init(bool coordinator) { singleton = new Dunit(coordinator); }
/** return the already initialized singleton Dunit instance. */
static Dunit *getSingleton() {
ASSERT(singleton != nullptr, "singleton not created yet.");
return singleton;
}
/** delete the existing singleton */
static void close() {
Dunit *tmp = singleton;
singleton = nullptr;
delete tmp;
}
TestState *getState() {
return reinterpret_cast<TestState *>(globals_region_.get_address());
}
bip::managed_shared_memory &getManagedState() { return managed_state_; }
};
#define DUNIT dunit::Dunit::getSingleton()
Dunit *Dunit::singleton = nullptr;
void TestState::reset() {
next_worker_ = 0;
failure_ = false;
terminate_ = false;
for (auto i = 0U; i < WORKER_COUNT; ++i) {
worker_state_[i] = 0;
worker_timeout_[i] = -1;
}
}
void TestState::setWorkerTimeout(int id, int seconds) {
worker_timeout_[id - 1] = seconds;
}
int TestState::getWorkerTimeout(int id) const {
return worker_timeout_[id - 1];
}
void TestState::setWorkerState(int id, uint8_t state) {
worker_state_[id - 1] = state;
}
int TestState::getWorkerState(int id) const { return worker_state_[id - 1]; }
void TestState::setNextWorker(int id) { next_worker_ = id; }
int TestState::getNextWorker() { return next_worker_; }
void TestState::fail() { failure_ = true; }
bool TestState::failed() const { return failure_; }
void TestState::terminate() { terminate_ = true; }
bool TestState::terminated() const { return terminate_; }
void Task::setTimeout(int seconds) {
auto state = DUNIT->getState();
if (seconds > 0) {
state->setWorkerTimeout(m_id, seconds);
} else {
state->setWorkerTimeout(m_id, TASK_TIMEOUT);
}
}
class TestProcess {
public:
TestProcess(const std::string &cmdline, uint32_t id)
: id_{id}, running_{false}, cmd_{cmdline} {}
WorkerId &getWorkerId() { return id_; }
void run() {
auto arguments = bpo::split_unix(cmd_);
std::string exe = arguments[0];
arguments.erase(arguments.begin());
process_ = bp::child(exe, bp::args = arguments);
process_.wait();
if (process_.exit_code() != 0) {
std::clog << "Worker " << id_.getIdName() << " exited with code "
<< process_.exit_code() << std::endl;
}
running_ = false;
}
void start() {
running_ = true;
thread_ = std::thread{[this]() { run(); }};
}
void stop() {
if (thread_.joinable()) {
thread_.join();
}
}
bool running() const { return running_; }
protected:
WorkerId id_;
bool running_;
std::string cmd_;
bp::child process_;
std::thread thread_;
};
/**
* Container of TestProcess(es) held in driver. each represents one of the
* legal WorkerIds spawned when TestDriver is created.
*/
class TestDriver {
private:
TestProcess *m_workers[4];
public:
TestDriver() {
dunit::Dunit::init(true);
std::cout << "Coordinator starting workers.\n";
for (uint32_t i = 1; i < 5; i++) {
std::string cmdline;
cmdline = g_programName + " -s" + std::to_string(i) + " -m" +
std::to_string(g_coordinatorPid);
std::cout << cmdline.c_str() << "\n";
m_workers[i - 1] = new TestProcess(cmdline, i);
}
std::cout << std::flush;
// start each of the workers...
for (uint32_t j = 1; j < 5; j++) {
m_workers[j - 1]->start();
std::this_thread::sleep_for(
std::chrono::seconds(2)); // do not increase this to avoid precheckin
// runs taking much longer.
}
}
~TestDriver() {
for (uint32_t i = 0; i < TestState::WORKER_COUNT;) {
auto worker = m_workers[i++];
worker->stop();
delete worker;
}
dunit::Dunit::close();
}
int begin() {
std::cout << "Coordinator started with pid "
<< boost::this_process::get_id() << "\n"
<< std::flush;
waitForReady();
// dispatch task...
int nextWorker;
auto state = DUNIT->getState();
while ((nextWorker = TaskQueues::getWorkerId()) != 0) {
WorkerId sId(nextWorker);
state->setWorkerState(nextWorker, WORKER_STATE_SCHEDULED);
std::cout << "Set next process to " << sId.getIdName() << "\n"
<< std::flush;
state->setNextWorker(nextWorker);
waitForCompletion(sId);
// check special conditions.
if (state->failed()) {
state->terminate();
waitForDone();
return 1;
}
}
// end all work..
state->terminate();
waitForDone();
return 0;
}
/** wait for an individual worker to finish a task. */
void waitForCompletion(WorkerId &sId) {
auto id = sId.getId();
auto state = DUNIT->getState();
int secs = state->getWorkerTimeout(id);
state->setWorkerTimeout(id, TASK_TIMEOUT);
if (secs <= 0) {
secs = TASK_TIMEOUT;
}
std::cout << "Waiting " << secs << " seconds for " << sId.getIdName()
<< " to finish task.\n"
<< std::flush;
auto end = std::chrono::steady_clock::now() + std::chrono::seconds{secs};
while (state->getWorkerState(id) != WORKER_STATE_TASK_COMPLETE) {
// sleep a bit..
if (state->failed()) {
return;
}
std::this_thread::sleep_for(std::chrono::milliseconds{100});
checkWorkerDeath();
auto now = std::chrono::steady_clock::now();
if (now >= end) {
handleTimeout(sId);
break;
}
}
}
void handleTimeout() {
std::cout << "Error: Timed out waiting for all workers to be ready.\n"
<< std::flush;
auto state = DUNIT->getState();
state->terminate();
state->fail();
}
void handleTimeout(WorkerId &sId) {
std::cout << "Error: Timed out waiting for " << sId.getIdName()
<< " to finish task.\n"
<< std::flush;
auto state = DUNIT->getState();
state->terminate();
state->fail();
}
/** wait for all workers
* to be done initializing. */
void waitForReady() {
auto state = DUNIT->getState();
std::cout << "Waiting " << TASK_TIMEOUT
<< " seconds for all workers to be ready.\n"
<< std::flush;
auto end =
std::chrono::steady_clock::now() + std::chrono::seconds{TASK_TIMEOUT};
uint32_t readyCount = 0;
while (readyCount < TestState::WORKER_COUNT) {
std::cout << "Ready Count: " << readyCount << "\n" << std::flush;
if (state->failed()) {
return;
}
std::this_thread::sleep_for(std::chrono::seconds{1});
readyCount = 0;
for (uint32_t i = 1; i < 5; i++) {
int status = state->getWorkerState(i);
if (status == WORKER_STATE_READY) {
++readyCount;
}
}
checkWorkerDeath();
auto now = std::chrono::steady_clock::now();
if (now >= end) {
handleTimeout();
break;
}
}
}
/** wait for all workers to be destroyed. */
void waitForDone() {
auto state = DUNIT->getState();
std::cout << "Waiting " << TASK_TIMEOUT
<< " seconds for all workers to complete.\n"
<< std::flush;
uint32_t doneCount = 0;
auto end =
std::chrono::steady_clock::now() + std::chrono::seconds{TASK_TIMEOUT};
while (doneCount < TestState::WORKER_COUNT) {
// if ( DUNIT->getFailed() ) return;
// sleep a bit..
std::this_thread::sleep_for(std::chrono::milliseconds{100});
doneCount = 0;
for (uint32_t i = 1; i < 5; i++) {
int status = state->getWorkerState(i);
if (status == WORKER_STATE_DONE) {
++doneCount;
}
}
auto now = std::chrono::steady_clock::now();
if (now >= end) {
handleTimeout();
break;
}
}
}
/** test to see that all the worker processes are still around, or throw
a TestException so the driver doesn't get hung. */
void checkWorkerDeath() {
auto state = DUNIT->getState();
for (uint32_t i = 0; i < TestState::WORKER_COUNT; i++) {
if (!m_workers[i]->running()) {
auto msg = std::string("Error: Worker ") +
m_workers[i]->getWorkerId().getIdName() +
" terminated prematurely";
LOG(msg);
state->fail();
state->terminate();
FAIL(msg);
}
}
}
};
class TestWorker {
private:
WorkerId m_sId;
public:
static WorkerId *procWorkerId;
explicit TestWorker(int id) : m_sId(id) {
procWorkerId = new WorkerId(id);
dunit::Dunit::init(false);
DUNIT->getState()->setWorkerState(m_sId.getId(), WORKER_STATE_READY);
std::clog << "Started worker " << id << std::endl;
}
~TestWorker() {
DUNIT->getState()->setWorkerState(m_sId.getId(), WORKER_STATE_DONE);
dunit::Dunit::close();
}
void begin() {
auto state = DUNIT->getState();
std::cout << "Worker " << m_sId.getIdName() << " started with pid "
<< boost::this_process::get_id() << "\n"
<< std::flush;
// consume tasks of this workers queue, only when it is his turn..
while (!state->terminated()) {
if (state->getNextWorker() == m_sId.getId()) {
// set next worker to zero so I don't accidently run twice.
state->setNextWorker(0);
// do next task...
Task *task = TaskQueues::getTask(m_sId);
// perform task.
if (task != nullptr) {
state->setWorkerState(m_sId.getId(), WORKER_STATE_TASK_ACTIVE);
try {
task->doTask();
if (task->m_isHeapAllocated) {
delete task;
}
fflush(stdout);
state->setWorkerState(m_sId.getId(), WORKER_STATE_TASK_COMPLETE);
} catch (TestException te) {
if (task->m_isHeapAllocated) {
delete task;
}
te.print();
handleError();
return;
} catch (...) {
if (task->m_isHeapAllocated) {
delete task;
}
LOG("Unhandled exception, terminating.");
handleError();
return;
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds{100});
}
}
void handleError() {
auto state = DUNIT->getState();
state->fail();
state->terminate();
state->setWorkerState(m_sId.getId(), WORKER_STATE_TASK_COMPLETE);
}
};
WorkerId *TestWorker::procWorkerId = nullptr;
void sleep(int millis) {
if (millis == 0) {
std::this_thread::yield();
} else {
std::this_thread::sleep_for(std::chrono::milliseconds{millis});
}
}
void logCoordinator(std::string s, int lineno, const char * /*filename*/) {
using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::chrono::system_clock;
auto now = system_clock::now();
auto in_time_t = system_clock::to_time_t(now);
auto localtime = std::localtime(&in_time_t);
auto usec =
duration_cast<microseconds>(now.time_since_epoch()).count() % 1000;
std::cout << "[TEST " << std::put_time(localtime, "%Y/%m/%d %H:%M:%S") << '.'
<< std::setfill('0') << std::setw(6) << usec << std::setw(0)
<< " coordinator:pid(" << boost::this_process::get_id() << ")] "
<< s << " at line: " << lineno << std::endl
<< std::flush;
}
// log a message and print the worker id as well.. used by fw_helper with no
// worker id.
void log(std::string s, int lineno, const char * /*filename*/, int /*id*/) {
using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::chrono::system_clock;
auto now = system_clock::now();
auto in_time_t = system_clock::to_time_t(now);
auto localtime = std::localtime(&in_time_t);
auto usec =
duration_cast<microseconds>(now.time_since_epoch()).count() % 1000;
std::cout << "[TEST " << std::put_time(localtime, "%Y/%m/%d %H:%M:%S") << '.'
<< std::setfill('0') << std::setw(6) << usec << std::setw(0)
<< " 0:pid(" << boost::this_process::get_id() << ")] " << s
<< " at line: " << lineno << std::endl
<< std::flush;
}
// log a message and print the worker id as well..
void log(std::string s, int lineno, const char * /*filename*/) {
using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::chrono::system_clock;
auto now = system_clock::now();
auto in_time_t = system_clock::to_time_t(now);
auto localtime = std::localtime(&in_time_t);
auto usec =
duration_cast<microseconds>(now.time_since_epoch()).count() % 1000;
std::cout << "[TEST " << std::put_time(localtime, "%Y/%m/%d %H:%M:%S") << '.'
<< std::setfill('0') << std::setw(6) << usec << std::setw(0) << ' '
<< (dunit::TestWorker::procWorkerId
? dunit::TestWorker::procWorkerId->getIdName()
: "coordinator")
<< ":pid(" << boost::this_process::get_id() << ")] " << s
<< " at line: " << lineno << std::endl
<< std::flush;
}
int dmain(int argc, char *argv[]) {
using apache::geode::client::Utils;
#ifdef USE_SMARTHEAP
MemRegisterTask();
#endif
setupCRTOutput();
auto timebomb = std::chrono::seconds{std::stoi(Utils::getEnv("TIMEBOMB"))};
TimeBomb tb(timebomb, []() { gClientCleanup.trigger(); });
tb.arm();
g_programName = argv[0];
bpo::options_description generic("Options");
auto &&options = generic.add_options();
options("worker,s", bpo::value<int>(), "Set worker ID");
options("coordinator,m", bpo::value<int>(), "Set coordinator PID");
options("help", "Shows this help");
bpo::variables_map vm;
bpo::store(bpo::parse_command_line(argc, argv, generic), vm);
bpo::notify(vm);
int result = 0;
int workerId = 0;
auto iter = vm.find("worker");
if (iter != vm.end()) {
workerId = iter->second.as<int>();
}
iter = vm.find("coordinator");
if (iter != vm.end()) {
g_coordinatorPid = iter->second.as<int>();
} else {
g_coordinatorPid = boost::this_process::get_id();
}
try {
if (workerId > 0) {
dunit::TestWorker worker(workerId);
worker.begin();
} else {
dunit::TestDriver tdriver;
result = tdriver.begin();
if (result == 0) {
std::cout << "#### All Tasks completed successfully. ####\n";
} else {
std::cout << "#### FAILED. ####\n";
}
fflush(stdout);
}
std::cout << "final worker id " << workerId << ", result " << result
<< "\n";
std::cout << "before calling cleanup " << workerId << "\n";
gClientCleanup.trigger();
std::cout << "after calling cleanup\n";
return result;
} catch (dunit::TestException &te) {
te.print();
} catch (apache::geode::client::testframework::FwkException &fe) {
std::cout << "Exception: " << fe.what() << "\n" << std::flush;
} catch (std::exception &ex) {
std::cout << "Exception: system exception reached main: " << ex.what()
<< ".\n"
<< std::flush;
} catch (...) {
std::cout << "Exception: unhandled/unidentified exception reached main.\n"
<< std::flush;
}
gClientCleanup.trigger();
return 1;
}
/** entry point for test code modules to access the naming service. */
bip::managed_shared_memory &globals() { return DUNIT->getManagedState(); }
} // namespace dunit