blob: cf91a1d90aae0b138b0aeb639d7dec0ad1b09b59 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/threadpool.cc
// and modified by Doris
#include "util/threadpool.h"
#include <algorithm>
#include <cstdint>
#include <limits>
#include <ostream>
#include <thread>
#include <utility>
#include "absl/strings/substitute.h"
#include "common/exception.h"
#include "common/logging.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "util/scoped_cleanup.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"
namespace doris {
// The name of these varialbs will be useds as metric name in prometheus.
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_time_ns_total,
MetricUnit::NANOSECONDS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_execution_count_total, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_time_ns_total,
MetricUnit::NANOSECONDS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_task_wait_worker_count_total, MetricUnit::NOUNIT);
using namespace ErrorCode;
using std::string;
class FunctionRunnable : public Runnable {
public:
explicit FunctionRunnable(std::function<void()> func) : _func(std::move(func)) {}
void run() override { _func(); }
private:
std::function<void()> _func;
};
ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group)
: _name(std::move(name)),
_workload_group(std::move(workload_group)),
_min_threads(0),
_max_threads(std::thread::hardware_concurrency()),
_max_queue_size(std::numeric_limits<int>::max()),
_idle_timeout(std::chrono::milliseconds(500)) {}
ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
CHECK_GE(min_threads, 0);
_min_threads = min_threads;
return *this;
}
ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
CHECK_GT(max_threads, 0);
_max_threads = max_threads;
return *this;
}
ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
_max_queue_size = max_queue_size;
return *this;
}
ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(
std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl) {
_cgroup_cpu_ctl = cgroup_cpu_ctl;
return *this;
}
ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode,
int max_concurrency)
: _mode(mode),
_pool(pool),
_state(State::IDLE),
_active_threads(0),
_max_concurrency(max_concurrency),
_num_submitted_tasks(0),
_num_unsubmitted_tasks(0) {
if (max_concurrency == 1 && mode != ThreadPool::ExecutionMode::SERIAL) {
_mode = ThreadPool::ExecutionMode::SERIAL;
}
}
ThreadPoolToken::~ThreadPoolToken() {
shutdown();
_pool->release_token(this);
}
Status ThreadPoolToken::submit(std::shared_ptr<Runnable> r) {
return _pool->do_submit(std::move(r), this);
}
Status ThreadPoolToken::submit_func(std::function<void()> f) {
return submit(std::make_shared<FunctionRunnable>(std::move(f)));
}
void ThreadPoolToken::shutdown() {
std::unique_lock<std::mutex> l(_pool->_lock);
_pool->check_not_pool_thread_unlocked();
// Clear the queue under the lock, but defer the releasing of the tasks
// outside the lock, in case there are concurrent threads wanting to access
// the ThreadPool. The task's destructors may acquire locks, etc, so this
// also prevents lock inversions.
std::deque<ThreadPool::Task> to_release = std::move(_entries);
_pool->_total_queued_tasks -= to_release.size();
switch (state()) {
case State::IDLE:
// There were no tasks outstanding; we can quiesce the token immediately.
transition(State::QUIESCED);
break;
case State::RUNNING:
// There were outstanding tasks. If any are still running, switch to
// QUIESCING and wait for them to finish (the worker thread executing
// the token's last task will switch the token to QUIESCED). Otherwise,
// we can quiesce the token immediately.
// Note: this is an O(n) operation, but it's expected to be infrequent.
// Plus doing it this way (rather than switching to QUIESCING and waiting
// for a worker thread to process the queue entry) helps retain state
// transition symmetry with ThreadPool::shutdown.
for (auto it = _pool->_queue.begin(); it != _pool->_queue.end();) {
if (*it == this) {
it = _pool->_queue.erase(it);
} else {
it++;
}
}
if (_active_threads == 0) {
transition(State::QUIESCED);
break;
}
transition(State::QUIESCING);
[[fallthrough]];
case State::QUIESCING:
// The token is already quiescing. Just wait for a worker thread to
// switch it to QUIESCED.
_not_running_cond.wait(l, [this]() { return state() == State::QUIESCED; });
break;
default:
break;
}
}
void ThreadPoolToken::wait() {
std::unique_lock<std::mutex> l(_pool->_lock);
_pool->check_not_pool_thread_unlocked();
_not_running_cond.wait(l, [this]() { return !is_active(); });
}
void ThreadPoolToken::transition(State new_state) {
#ifndef NDEBUG
CHECK_NE(_state, new_state);
switch (_state) {
case State::IDLE:
CHECK(new_state == State::RUNNING || new_state == State::QUIESCED);
if (new_state == State::RUNNING) {
CHECK(!_entries.empty());
} else {
CHECK(_entries.empty());
CHECK_EQ(_active_threads, 0);
}
break;
case State::RUNNING:
CHECK(new_state == State::IDLE || new_state == State::QUIESCING ||
new_state == State::QUIESCED);
CHECK(_entries.empty());
if (new_state == State::QUIESCING) {
CHECK_GT(_active_threads, 0);
}
break;
case State::QUIESCING:
CHECK(new_state == State::QUIESCED);
CHECK_EQ(_active_threads, 0);
break;
case State::QUIESCED:
CHECK(false); // QUIESCED is a terminal state
break;
default:
throw Exception(Status::FatalError("Unknown token state: {}", _state));
}
#endif
// Take actions based on the state we're entering.
switch (new_state) {
case State::IDLE:
case State::QUIESCED:
_not_running_cond.notify_all();
break;
default:
break;
}
_state = new_state;
}
const char* ThreadPoolToken::state_to_string(State s) {
switch (s) {
case State::IDLE:
return "IDLE";
break;
case State::RUNNING:
return "RUNNING";
break;
case State::QUIESCING:
return "QUIESCING";
break;
case State::QUIESCED:
return "QUIESCED";
break;
}
return "<cannot reach here>";
}
bool ThreadPoolToken::need_dispatch() {
return _state == ThreadPoolToken::State::IDLE ||
(_mode == ThreadPool::ExecutionMode::CONCURRENT &&
_num_submitted_tasks < _max_concurrency);
}
ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
: _name(builder._name),
_workload_group(builder._workload_group),
_min_threads(builder._min_threads),
_max_threads(builder._max_threads),
_max_queue_size(builder._max_queue_size),
_idle_timeout(builder._idle_timeout),
_pool_status(Status::Uninitialized("The pool was not initialized.")),
_num_threads(0),
_num_threads_pending_start(0),
_active_threads(0),
_total_queued_tasks(0),
_cgroup_cpu_ctl(builder._cgroup_cpu_ctl),
_tokenless(new_token(ExecutionMode::CONCURRENT)),
_id(UniqueId::gen_uid()) {}
ThreadPool::~ThreadPool() {
// There should only be one live token: the one used in tokenless submission.
CHECK_EQ(1, _tokens.size()) << absl::Substitute(
"Threadpool $0 destroyed with $1 allocated tokens", _name, _tokens.size());
shutdown();
}
Status ThreadPool::try_create_thread(int thread_num, std::lock_guard<std::mutex>&) {
for (int i = 0; i < thread_num; i++) {
Status status = create_thread();
if (status.ok()) {
_num_threads_pending_start++;
} else {
LOG(WARNING) << "Thread pool " << _name << " failed to create thread: " << status;
return status;
}
}
return Status::OK();
}
Status ThreadPool::init() {
if (!_pool_status.is<UNINITIALIZED>()) {
return Status::NotSupported("The thread pool {} is already initialized", _name);
}
_pool_status = Status::OK();
{
std::lock_guard<std::mutex> l(_lock);
// create thread failed should not cause threadpool init failed,
// because thread can be created later such as when submit a task.
static_cast<void>(try_create_thread(_min_threads, l));
}
// _id of thread pool is used to make sure when we create thread pool with same name, we can
// get different _metric_entity
// If not, we will have problem when we deregister entity and register hook.
_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
fmt::format("thread_pool_{}", _name), {{"thread_pool_name", _name},
{"workload_group", _workload_group},
{"id", _id.to_string()}});
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size);
INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size);
INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_time_ns_total);
INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_execution_count_total);
INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_time_ns_total);
INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_task_wait_worker_count_total);
INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed);
_metric_entity->register_hook("update", [this]() {
{
std::lock_guard<std::mutex> l(_lock);
if (!_pool_status.ok()) {
return;
}
}
thread_pool_active_threads->set_value(num_active_threads());
thread_pool_queue_size->set_value(get_queue_size());
thread_pool_max_queue_size->set_value(get_max_queue_size());
thread_pool_max_threads->set_value(max_threads());
});
return Status::OK();
}
void ThreadPool::shutdown() {
// Why access to doris_metrics is safe here?
// Since DorisMetrics is a singleton, it will be destroyed only after doris_main is exited.
// The shutdown/destroy of ThreadPool is guaranteed to take place before doris_main exits by
// ExecEnv::destroy().
DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity);
std::unique_lock<std::mutex> l(_lock);
check_not_pool_thread_unlocked();
// Note: this is the same error seen at submission if the pool is at
// capacity, so clients can't tell them apart. This isn't really a practical
// concern though because shutting down a pool typically requires clients to
// be quiesced first, so there's no danger of a client getting confused.
// Not print stack trace here
_pool_status = Status::Error<SERVICE_UNAVAILABLE, false>(
"The thread pool {} has been shut down.", _name);
// Clear the various queues under the lock, but defer the releasing
// of the tasks outside the lock, in case there are concurrent threads
// wanting to access the ThreadPool. The task's destructors may acquire
// locks, etc, so this also prevents lock inversions.
_queue.clear();
std::deque<std::deque<Task>> to_release;
for (auto* t : _tokens) {
if (!t->_entries.empty()) {
to_release.emplace_back(std::move(t->_entries));
}
switch (t->state()) {
case ThreadPoolToken::State::IDLE:
// The token is idle; we can quiesce it immediately.
t->transition(ThreadPoolToken::State::QUIESCED);
break;
case ThreadPoolToken::State::RUNNING:
// The token has tasks associated with it. If they're merely queued
// (i.e. there are no active threads), the tasks will have been removed
// above and we can quiesce immediately. Otherwise, we need to wait for
// the threads to finish.
t->transition(t->_active_threads > 0 ? ThreadPoolToken::State::QUIESCING
: ThreadPoolToken::State::QUIESCED);
break;
default:
break;
}
}
// The queues are empty. Wake any sleeping worker threads and wait for all
// of them to exit. Some worker threads will exit immediately upon waking,
// while others will exit after they finish executing an outstanding task.
_total_queued_tasks = 0;
while (!_idle_threads.empty()) {
_idle_threads.front().not_empty.notify_one();
_idle_threads.pop_front();
}
_no_threads_cond.wait(l, [this]() { return _num_threads + _num_threads_pending_start == 0; });
// All the threads have exited. Check the state of each token.
for (auto* t : _tokens) {
DCHECK(t->state() == ThreadPoolToken::State::IDLE ||
t->state() == ThreadPoolToken::State::QUIESCED);
}
}
std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode, int max_concurrency) {
std::lock_guard<std::mutex> l(_lock);
std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, max_concurrency));
if (!_tokens.insert(t.get()).second) {
throw Exception(Status::InternalError("duplicate token"));
}
return t;
}
void ThreadPool::release_token(ThreadPoolToken* t) {
std::lock_guard<std::mutex> l(_lock);
CHECK(!t->is_active()) << absl::Substitute("Token with state $0 may not be released",
ThreadPoolToken::state_to_string(t->state()));
CHECK_EQ(1, _tokens.erase(t));
}
Status ThreadPool::submit(std::shared_ptr<Runnable> r) {
return do_submit(std::move(r), _tokenless.get());
}
Status ThreadPool::submit_func(std::function<void()> f) {
return submit(std::make_shared<FunctionRunnable>(std::move(f)));
}
Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token) {
DCHECK(token);
std::unique_lock<std::mutex> l(_lock);
if (!_pool_status.ok()) [[unlikely]] {
return _pool_status;
}
if (!token->may_submit_new_tasks()) [[unlikely]] {
return Status::Error<SERVICE_UNAVAILABLE>("Thread pool({}) token was shut down", _name);
}
// Size limit check.
int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads +
static_cast<int64_t>(_max_queue_size) - _total_queued_tasks;
if (capacity_remaining < 1) {
thread_pool_submit_failed->increment(1);
return Status::Error<SERVICE_UNAVAILABLE>(
"Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name,
_num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks,
_max_queue_size);
}
// Should we create another thread?
// We assume that each current inactive thread will grab one item from the
// queue. If it seems like we'll need another thread, we create one.
//
// Rather than creating the thread here, while holding the lock, we defer
// it to down below. This is because thread creation can be rather slow
// (hundreds of milliseconds in some cases) and we'd like to allow the
// existing threads to continue to process tasks while we do so.
//
// In theory, a currently active thread could finish immediately after this
// calculation but before our new worker starts running. This would mean we
// created a thread we didn't really need. However, this race is unavoidable
// and harmless.
//
// Of course, we never create more than _max_threads threads no matter what.
int threads_from_this_submit =
token->is_active() && token->mode() == ExecutionMode::SERIAL ? 0 : 1;
int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads;
int additional_threads =
static_cast<int>(_queue.size()) + threads_from_this_submit - inactive_threads;
bool need_a_thread = false;
if (additional_threads > 0 && _num_threads + _num_threads_pending_start < _max_threads) {
need_a_thread = true;
_num_threads_pending_start++;
}
Task task;
task.runnable = std::move(r);
task.submit_time_wather.start();
// Add the task to the token's queue.
ThreadPoolToken::State state = token->state();
DCHECK(state == ThreadPoolToken::State::IDLE || state == ThreadPoolToken::State::RUNNING);
token->_entries.emplace_back(std::move(task));
// When we need to execute the task in the token, we submit the token object to the queue.
// There are currently two places where tokens will be submitted to the queue:
// 1. When submitting a new task, if the token is still in the IDLE state,
// or the concurrency of the token has not reached the online level, it will be added to the queue.
// 2. When the dispatch thread finishes executing a task:
// 1. If it is a SERIAL token, and there are unsubmitted tasks, submit them to the queue.
// 2. If it is a CONCURRENT token, and there are still unsubmitted tasks, and the upper limit of concurrency is not reached,
// then submitted to the queue.
if (token->need_dispatch()) {
_queue.emplace_back(token);
++token->_num_submitted_tasks;
if (state == ThreadPoolToken::State::IDLE) {
token->transition(ThreadPoolToken::State::RUNNING);
}
} else {
++token->_num_unsubmitted_tasks;
}
_total_queued_tasks++;
// Wake up an idle thread for this task. Choosing the thread at the front of
// the list ensures LIFO semantics as idling threads are also added to the front.
//
// If there are no idle threads, the new task remains on the queue and is
// processed by an active thread (or a thread we're about to create) at some
// point in the future.
if (!_idle_threads.empty()) {
_idle_threads.front().not_empty.notify_one();
_idle_threads.pop_front();
}
l.unlock();
if (need_a_thread) {
Status status = create_thread();
if (!status.ok()) {
l.lock();
_num_threads_pending_start--;
if (_num_threads + _num_threads_pending_start == 0) {
// If we have no threads, we can't do any work.
return status;
}
// If we failed to create a thread, but there are still some other
// worker threads, log a warning message and continue.
LOG(WARNING) << "Thread pool " << _name
<< " failed to create thread: " << status.to_string();
}
}
return Status::OK();
}
void ThreadPool::wait() {
std::unique_lock<std::mutex> l(_lock);
check_not_pool_thread_unlocked();
_idle_cond.wait(l, [this]() { return _total_queued_tasks == 0 && _active_threads == 0; });
}
void ThreadPool::dispatch_thread() {
std::unique_lock<std::mutex> l(_lock);
if (!_threads.insert(Thread::current_thread()).second) {
throw Exception(Status::InternalError("duplicate token"));
}
DCHECK_GT(_num_threads_pending_start, 0);
_num_threads++;
_num_threads_pending_start--;
if (std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_sptr = _cgroup_cpu_ctl.lock()) {
static_cast<void>(cg_cpu_ctl_sptr->add_thread_to_cgroup());
}
// Owned by this worker thread and added/removed from _idle_threads as needed.
IdleThread me;
while (true) {
// Note: Status::Aborted() is used to indicate normal shutdown.
if (!_pool_status.ok()) {
VLOG_CRITICAL << "DispatchThread exiting: " << _pool_status.to_string();
break;
}
if (_num_threads + _num_threads_pending_start > _max_threads) {
break;
}
if (_queue.empty()) {
// There's no work to do, let's go idle.
//
// Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
_idle_threads.push_front(me);
SCOPED_CLEANUP({
// For some wake ups (i.e. shutdown or do_submit) this thread is
// guaranteed to be unlinked after being awakened. In others (i.e.
// spurious wake-up or Wait timeout), it'll still be linked.
if (me.is_linked()) {
_idle_threads.erase(_idle_threads.iterator_to(me));
}
});
if (me.not_empty.wait_for(l, _idle_timeout) == std::cv_status::timeout) {
// After much investigation, it appears that pthread condition variables have
// a weird behavior in which they can return ETIMEDOUT from timed_wait even if
// another thread did in fact signal. Apparently after a timeout there is some
// brief period during which another thread may actually grab the internal mutex
// protecting the state, signal, and release again before we get the mutex. So,
// we'll recheck the empty queue case regardless.
if (_queue.empty() && _num_threads + _num_threads_pending_start > _min_threads) {
VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
_idle_timeout)
.count()
<< "ms of idle time.";
break;
}
}
continue;
}
MonotonicStopWatch task_execution_time_watch;
task_execution_time_watch.start();
// Get the next token and task to execute.
ThreadPoolToken* token = _queue.front();
_queue.pop_front();
DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
DCHECK(!token->_entries.empty());
Task task = std::move(token->_entries.front());
thread_pool_task_wait_worker_time_ns_total->increment(
task.submit_time_wather.elapsed_time());
thread_pool_task_wait_worker_count_total->increment(1);
token->_entries.pop_front();
token->_active_threads++;
--_total_queued_tasks;
++_active_threads;
l.unlock();
// Execute the task
task.runnable->run();
// Destruct the task while we do not hold the lock.
//
// The task's destructor may be expensive if it has a lot of bound
// objects, and we don't want to block submission of the threadpool.
// In the worst case, the destructor might even try to do something
// with this threadpool, and produce a deadlock.
task.runnable.reset();
l.lock();
thread_pool_task_execution_time_ns_total->increment(
task_execution_time_watch.elapsed_time());
thread_pool_task_execution_count_total->increment(1);
// Possible states:
// 1. The token was shut down while we ran its task. Transition to QUIESCED.
// 2. The token has no more queued tasks. Transition back to IDLE.
// 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
ThreadPoolToken::State state = token->state();
DCHECK(state == ThreadPoolToken::State::RUNNING ||
state == ThreadPoolToken::State::QUIESCING);
--token->_active_threads;
--token->_num_submitted_tasks;
// handle shutdown && idle
if (token->_active_threads == 0) {
if (state == ThreadPoolToken::State::QUIESCING) {
DCHECK(token->_entries.empty());
token->transition(ThreadPoolToken::State::QUIESCED);
} else if (token->_entries.empty()) {
token->transition(ThreadPoolToken::State::IDLE);
}
}
// We decrease _num_submitted_tasks holding lock, so the following DCHECK works.
DCHECK(token->_num_submitted_tasks < token->_max_concurrency);
// If token->state is running and there are unsubmitted tasks in the token, we put
// the token back.
if (token->_num_unsubmitted_tasks > 0 && state == ThreadPoolToken::State::RUNNING) {
// SERIAL: if _entries is not empty, then num_unsubmitted_tasks must be greater than 0.
// CONCURRENT: we have to check _num_unsubmitted_tasks because there may be at least 2
// threads are running for the token.
_queue.emplace_back(token);
++token->_num_submitted_tasks;
--token->_num_unsubmitted_tasks;
}
if (--_active_threads == 0) {
_idle_cond.notify_all();
}
}
// It's important that we hold the lock between exiting the loop and dropping
// _num_threads. Otherwise it's possible someone else could come along here
// and add a new task just as the last running thread is about to exit.
CHECK(l.owns_lock());
CHECK_EQ(_threads.erase(Thread::current_thread()), 1);
_num_threads--;
if (_num_threads + _num_threads_pending_start == 0) {
_no_threads_cond.notify_all();
// Sanity check: if we're the last thread exiting, the queue ought to be
// empty. Otherwise it will never get processed.
CHECK(_queue.empty());
DCHECK_EQ(0, _total_queued_tasks);
}
}
Status ThreadPool::create_thread() {
return Thread::create("thread pool", absl::Substitute("$0 [worker]", _name),
&ThreadPool::dispatch_thread, this, nullptr);
}
void ThreadPool::check_not_pool_thread_unlocked() {
Thread* current = Thread::current_thread();
if (_threads.contains(current)) {
throw Exception(
Status::FatalError("Thread belonging to thread pool {} with "
"name {} called pool function that would result in deadlock",
_name, current->name()));
}
}
Status ThreadPool::set_min_threads(int min_threads) {
std::lock_guard<std::mutex> l(_lock);
if (min_threads > _max_threads) {
// min threads can not be set greater than max threads
return Status::InternalError("set thread pool {} min_threads failed", _name);
}
_min_threads = min_threads;
if (min_threads > _num_threads + _num_threads_pending_start) {
int addition_threads = min_threads - _num_threads - _num_threads_pending_start;
RETURN_IF_ERROR(try_create_thread(addition_threads, l));
}
return Status::OK();
}
Status ThreadPool::set_max_threads(int max_threads) {
std::lock_guard<std::mutex> l(_lock);
DBUG_EXECUTE_IF("ThreadPool.set_max_threads.force_set", {
_max_threads = max_threads;
return Status::OK();
})
if (_min_threads > max_threads) {
// max threads can not be set less than min threads
return Status::InternalError("set thread pool {} max_threads failed", _name);
}
_max_threads = max_threads;
if (_max_threads > _num_threads + _num_threads_pending_start) {
int addition_threads = _max_threads - _num_threads - _num_threads_pending_start;
addition_threads = std::min(addition_threads, _total_queued_tasks);
RETURN_IF_ERROR(try_create_thread(addition_threads, l));
}
return Status::OK();
}
std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
return o << ThreadPoolToken::state_to_string(s);
}
} // namespace doris