blob: ab2b8f36527cfc8c173750c2068932a06ba4d5ba [file] [log] [blame]
#include "ThreadPoolImpl.h"
#include "absl/memory/memory.h"
#include "asio/executor_work_guard.hpp"
#include "asio/io_context.hpp"
#include "asio/post.hpp"
#include "rocketmq/RocketMQ.h"
#include "rocketmq/State.h"
#include "spdlog/spdlog.h"
#include <atomic>
#include <cstdint>
#include <exception>
#include <system_error>
ROCKETMQ_NAMESPACE_BEGIN
ThreadPoolImpl::ThreadPoolImpl(std::uint16_t workers)
: work_guard_(
absl::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(context_.get_executor())),
workers_(workers) {
}
void ThreadPoolImpl::start() {
for (std::uint16_t i = 0; i < workers_; i++) {
std::thread worker([this]() {
State expected = State::CREATED;
if (state_.compare_exchange_strong(expected, State::STARTED, std::memory_order_relaxed)) {
absl::MutexLock lk(&start_mtx_);
start_cv_.SignalAll();
}
while (true) {
#ifdef __EXCEPTIONS
try {
#endif
std::error_code ec;
context_.run(ec);
if (ec) {
SPDLOG_WARN("Error raised from ThreadPool: {}", ec.message());
}
#ifdef __EXCEPTIONS
} catch (std::exception& e) {
SPDLOG_WARN("Exception raised from ThreadPool: {}", e.what());
}
#endif
if (State::STARTED != state_.load(std::memory_order_relaxed)) {
SPDLOG_INFO("A thread-pool worker quit");
break;
}
}
});
threads_.emplace_back(std::move(worker));
}
{
absl::MutexLock lk(&start_mtx_);
if (State::CREATED == state_.load(std::memory_order_relaxed)) {
start_cv_.Wait(&start_mtx_);
}
}
}
void ThreadPoolImpl::shutdown() {
State expected = State::STARTED;
if (state_.compare_exchange_strong(expected, State::STOPPING, std::memory_order_relaxed)) {
work_guard_->reset();
context_.stop();
for (auto& thread : threads_) {
if (thread.joinable()) {
thread.join();
}
}
state_.store(State::STOPPED, std::memory_order_relaxed);
}
}
void ThreadPoolImpl::submit(std::function<void()> task) {
if (State::STARTED == state_.load(std::memory_order_relaxed)) {
asio::post(context_, task);
} else {
SPDLOG_WARN("State of ThreadPool is not STARTED");
}
}
ROCKETMQ_NAMESPACE_END