blob: cd523609d27415ce4159fb3614e519a77ef7eab0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/util/thread_pool.h"
#include <algorithm>
#include <condition_variable>
#include <deque>
#include <list>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
namespace arrow {
namespace internal {
Executor::~Executor() = default;
namespace {
struct Task {
FnOnce<void()> callable;
StopToken stop_token;
Executor::StopCallback stop_callback;
};
} // namespace
struct SerialExecutor::State {
std::queue<Task> task_queue;
std::mutex mutex;
std::condition_variable wait_for_tasks;
bool finished;
};
SerialExecutor::SerialExecutor() : state_(new State()) {}
SerialExecutor::~SerialExecutor() {}
Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
StopToken stop_token, StopCallback&& stop_callback) {
// The serial task queue is truly serial (no mutex needed) but SpawnReal may be called
// from external threads (e.g. when transferring back from blocking I/O threads) so a
// mutex is needed
{
std::lock_guard<std::mutex> lg(state_->mutex);
state_->task_queue.push(
Task{std::move(task), std::move(stop_token), std::move(stop_callback)});
}
state_->wait_for_tasks.notify_one();
return Status::OK();
}
void SerialExecutor::MarkFinished() {
std::lock_guard<std::mutex> lk(state_->mutex);
state_->finished = true;
// Keep the lock when notifying to avoid situations where the SerialExecutor
// would start being destroyed while the notify_one() call is still ongoing.
state_->wait_for_tasks.notify_one();
}
void SerialExecutor::RunLoop() {
std::unique_lock<std::mutex> lk(state_->mutex);
while (!state_->finished) {
while (!state_->task_queue.empty()) {
Task task = std::move(state_->task_queue.front());
state_->task_queue.pop();
lk.unlock();
if (!task.stop_token.IsStopRequested()) {
std::move(task.callable)();
} else {
if (task.stop_callback) {
std::move(task.stop_callback)(task.stop_token.Poll());
}
// Can't break here because there may be cleanup tasks down the chain we still
// need to run.
}
lk.lock();
}
// In this case we must be waiting on work from external (e.g. I/O) executors. Wait
// for tasks to arrive (typically via transferred futures).
state_->wait_for_tasks.wait(
lk, [&] { return state_->finished || !state_->task_queue.empty(); });
}
}
struct ThreadPool::State {
State() = default;
// NOTE: in case locking becomes too expensive, we can investigate lock-free FIFOs
// such as https://github.com/cameron314/concurrentqueue
std::mutex mutex_;
std::condition_variable cv_;
std::condition_variable cv_shutdown_;
std::list<std::thread> workers_;
// Trashcan for finished threads
std::vector<std::thread> finished_workers_;
std::deque<Task> pending_tasks_;
// Desired number of threads
int desired_capacity_ = 0;
// Total number of tasks that are either queued or running
int tasks_queued_or_running_ = 0;
// Are we shutting down?
bool please_shutdown_ = false;
bool quick_shutdown_ = false;
};
// The worker loop is an independent function so that it can keep running
// after the ThreadPool is destroyed.
static void WorkerLoop(std::shared_ptr<ThreadPool::State> state,
std::list<std::thread>::iterator it) {
std::unique_lock<std::mutex> lock(state->mutex_);
// Since we hold the lock, `it` now points to the correct thread object
// (LaunchWorkersUnlocked has exited)
DCHECK_EQ(std::this_thread::get_id(), it->get_id());
// If too many threads, we should secede from the pool
const auto should_secede = [&]() -> bool {
return state->workers_.size() > static_cast<size_t>(state->desired_capacity_);
};
while (true) {
// By the time this thread is started, some tasks may have been pushed
// or shutdown could even have been requested. So we only wait on the
// condition variable at the end of the loop.
// Execute pending tasks if any
while (!state->pending_tasks_.empty() && !state->quick_shutdown_) {
// We check this opportunistically at each loop iteration since
// it releases the lock below.
if (should_secede()) {
break;
}
DCHECK_GE(state->tasks_queued_or_running_, 0);
{
Task task = std::move(state->pending_tasks_.front());
state->pending_tasks_.pop_front();
StopToken* stop_token = &task.stop_token;
lock.unlock();
if (!stop_token->IsStopRequested()) {
std::move(task.callable)();
} else {
if (task.stop_callback) {
std::move(task.stop_callback)(stop_token->Poll());
}
}
ARROW_UNUSED(std::move(task)); // release resources before waiting for lock
lock.lock();
}
state->tasks_queued_or_running_--;
}
// Now either the queue is empty *or* a quick shutdown was requested
if (state->please_shutdown_ || should_secede()) {
break;
}
// Wait for next wakeup
state->cv_.wait(lock);
}
DCHECK_GE(state->tasks_queued_or_running_, 0);
// We're done. Move our thread object to the trashcan of finished
// workers. This has two motivations:
// 1) the thread object doesn't get destroyed before this function finishes
// (but we could call thread::detach() instead)
// 2) we can explicitly join() the trashcan threads to make sure all OS threads
// are exited before the ThreadPool is destroyed. Otherwise subtle
// timing conditions can lead to false positives with Valgrind.
DCHECK_EQ(std::this_thread::get_id(), it->get_id());
state->finished_workers_.push_back(std::move(*it));
state->workers_.erase(it);
if (state->please_shutdown_) {
// Notify the function waiting in Shutdown().
state->cv_shutdown_.notify_one();
}
}
ThreadPool::ThreadPool()
: sp_state_(std::make_shared<ThreadPool::State>()),
state_(sp_state_.get()),
shutdown_on_destroy_(true) {
#ifndef _WIN32
pid_ = getpid();
#endif
}
ThreadPool::~ThreadPool() {
if (shutdown_on_destroy_) {
ARROW_UNUSED(Shutdown(false /* wait */));
}
}
void ThreadPool::ProtectAgainstFork() {
#ifndef _WIN32
pid_t current_pid = getpid();
if (pid_ != current_pid) {
// Reinitialize internal state in child process after fork()
// Ideally we would use pthread_at_fork(), but that doesn't allow
// storing an argument, hence we'd need to maintain a list of all
// existing ThreadPools.
int capacity = state_->desired_capacity_;
auto new_state = std::make_shared<ThreadPool::State>();
new_state->please_shutdown_ = state_->please_shutdown_;
new_state->quick_shutdown_ = state_->quick_shutdown_;
pid_ = current_pid;
sp_state_ = new_state;
state_ = sp_state_.get();
// Launch worker threads anew
if (!state_->please_shutdown_) {
ARROW_UNUSED(SetCapacity(capacity));
}
}
#endif
}
Status ThreadPool::SetCapacity(int threads) {
ProtectAgainstFork();
std::unique_lock<std::mutex> lock(state_->mutex_);
if (state_->please_shutdown_) {
return Status::Invalid("operation forbidden during or after shutdown");
}
if (threads <= 0) {
return Status::Invalid("ThreadPool capacity must be > 0");
}
CollectFinishedWorkersUnlocked();
state_->desired_capacity_ = threads;
// See if we need to increase or decrease the number of running threads
const int required = std::min(static_cast<int>(state_->pending_tasks_.size()),
threads - static_cast<int>(state_->workers_.size()));
if (required > 0) {
// Some tasks are pending, spawn the number of needed threads immediately
LaunchWorkersUnlocked(required);
} else if (required < 0) {
// Excess threads are running, wake them so that they stop
state_->cv_.notify_all();
}
return Status::OK();
}
int ThreadPool::GetCapacity() {
ProtectAgainstFork();
std::unique_lock<std::mutex> lock(state_->mutex_);
return state_->desired_capacity_;
}
int ThreadPool::GetNumTasks() {
ProtectAgainstFork();
std::unique_lock<std::mutex> lock(state_->mutex_);
return state_->tasks_queued_or_running_;
}
int ThreadPool::GetActualCapacity() {
ProtectAgainstFork();
std::unique_lock<std::mutex> lock(state_->mutex_);
return static_cast<int>(state_->workers_.size());
}
Status ThreadPool::Shutdown(bool wait) {
ProtectAgainstFork();
std::unique_lock<std::mutex> lock(state_->mutex_);
if (state_->please_shutdown_) {
return Status::Invalid("Shutdown() already called");
}
state_->please_shutdown_ = true;
state_->quick_shutdown_ = !wait;
state_->cv_.notify_all();
state_->cv_shutdown_.wait(lock, [this] { return state_->workers_.empty(); });
if (!state_->quick_shutdown_) {
DCHECK_EQ(state_->pending_tasks_.size(), 0);
} else {
state_->pending_tasks_.clear();
}
CollectFinishedWorkersUnlocked();
return Status::OK();
}
void ThreadPool::CollectFinishedWorkersUnlocked() {
for (auto& thread : state_->finished_workers_) {
// Make sure OS thread has exited
thread.join();
}
state_->finished_workers_.clear();
}
void ThreadPool::LaunchWorkersUnlocked(int threads) {
std::shared_ptr<State> state = sp_state_;
for (int i = 0; i < threads; i++) {
state_->workers_.emplace_back();
auto it = --(state_->workers_.end());
*it = std::thread([state, it] { WorkerLoop(state, it); });
}
}
Status ThreadPool::SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken stop_token,
StopCallback&& stop_callback) {
{
ProtectAgainstFork();
std::lock_guard<std::mutex> lock(state_->mutex_);
if (state_->please_shutdown_) {
return Status::Invalid("operation forbidden during or after shutdown");
}
CollectFinishedWorkersUnlocked();
state_->tasks_queued_or_running_++;
if (static_cast<int>(state_->workers_.size()) < state_->tasks_queued_or_running_ &&
state_->desired_capacity_ > static_cast<int>(state_->workers_.size())) {
// We can still spin up more workers so spin up a new worker
LaunchWorkersUnlocked(/*threads=*/1);
}
state_->pending_tasks_.push_back(
{std::move(task), std::move(stop_token), std::move(stop_callback)});
}
state_->cv_.notify_one();
return Status::OK();
}
Result<std::shared_ptr<ThreadPool>> ThreadPool::Make(int threads) {
auto pool = std::shared_ptr<ThreadPool>(new ThreadPool());
RETURN_NOT_OK(pool->SetCapacity(threads));
return pool;
}
Result<std::shared_ptr<ThreadPool>> ThreadPool::MakeEternal(int threads) {
ARROW_ASSIGN_OR_RAISE(auto pool, Make(threads));
// On Windows, the ThreadPool destructor may be called after non-main threads
// have been killed by the OS, and hang in a condition variable.
// On Unix, we want to avoid leak reports by Valgrind.
#ifdef _WIN32
pool->shutdown_on_destroy_ = false;
#endif
return pool;
}
// ----------------------------------------------------------------------
// Global thread pool
static int ParseOMPEnvVar(const char* name) {
// OMP_NUM_THREADS is a comma-separated list of positive integers.
// We are only interested in the first (top-level) number.
auto result = GetEnvVar(name);
if (!result.ok()) {
return 0;
}
auto str = *std::move(result);
auto first_comma = str.find_first_of(',');
if (first_comma != std::string::npos) {
str = str.substr(0, first_comma);
}
try {
return std::max(0, std::stoi(str));
} catch (...) {
return 0;
}
}
int ThreadPool::DefaultCapacity() {
int capacity, limit;
capacity = ParseOMPEnvVar("OMP_NUM_THREADS");
if (capacity == 0) {
capacity = std::thread::hardware_concurrency();
}
limit = ParseOMPEnvVar("OMP_THREAD_LIMIT");
if (limit > 0) {
capacity = std::min(limit, capacity);
}
if (capacity == 0) {
ARROW_LOG(WARNING) << "Failed to determine the number of available threads, "
"using a hardcoded arbitrary value";
capacity = 4;
}
return capacity;
}
// Helper for the singleton pattern
std::shared_ptr<ThreadPool> ThreadPool::MakeCpuThreadPool() {
auto maybe_pool = ThreadPool::MakeEternal(ThreadPool::DefaultCapacity());
if (!maybe_pool.ok()) {
maybe_pool.status().Abort("Failed to create global CPU thread pool");
}
return *std::move(maybe_pool);
}
ThreadPool* GetCpuThreadPool() {
static std::shared_ptr<ThreadPool> singleton = ThreadPool::MakeCpuThreadPool();
return singleton.get();
}
Status RunSynchronouslyVoid(FnOnce<Future<arrow::detail::Empty>(Executor*)> get_future,
bool use_threads) {
return RunSynchronously(std::move(get_future), use_threads).status();
}
} // namespace internal
int GetCpuThreadPoolCapacity() { return internal::GetCpuThreadPool()->GetCapacity(); }
Status SetCpuThreadPoolCapacity(int threads) {
return internal::GetCpuThreadPool()->SetCapacity(threads);
}
} // namespace arrow