blob: 1da8a08f90d234d6eeb53daaebea502cba4bc707 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <mutex>
#include "util/blocking_priority_queue.hpp"
#include "util/blocking_queue.hpp"
#include "util/thread.h"
#include "util/thread_group.h"
namespace doris {
// Simple threadpool which processes items (of type T) in parallel which were placed on a
// blocking queue by Offer(). Each item is processed by a single user-supplied method.
template <bool Priority = false>
class WorkThreadPool {
public:
// Signature of a work-processing function. Takes the integer id of the thread which is
// calling it (ids run from 0 to num_threads - 1) and a reference to the item to
// process.
using WorkFunction = std::function<void()>;
struct Task {
public:
int priority;
WorkFunction work_function;
bool operator<(const Task& o) const { return priority < o.priority; }
Task& operator++() {
priority += 2;
return *this;
}
};
using WorkQueue =
std::conditional_t<Priority, BlockingPriorityQueue<Task>, BlockingQueue<Task>>;
// Creates a new thread pool and start num_threads threads.
// -- num_threads: how many threads are part of this pool
// -- queue_size: the maximum size of the queue on which work items are offered. If the
// queue exceeds this size, subsequent calls to Offer will block until there is
// capacity available.
WorkThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name)
: _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) {
for (int i = 0; i < num_threads; ++i) {
_threads.create_thread(
std::bind<void>(std::mem_fn(&WorkThreadPool::work_thread), this, i));
}
}
// Destructor ensures that all threads are terminated before this object is freed
// (otherwise they may continue to run and reference member variables)
virtual ~WorkThreadPool() {
shutdown();
join();
}
// Blocking operation that puts a work item on the queue. If the queue is full, blocks
// until there is capacity available.
//
// 'work' is copied into the work queue, but may be referenced at any time in the
// future. Therefore the caller needs to ensure that any data referenced by work (if T
// is, e.g., a pointer type) remains valid until work has been processed, and it's up to
// the caller to provide their own signalling mechanism to detect this (or to wait until
// after DrainAndshutdown returns).
//
// Returns true if the work item was successfully added to the queue, false otherwise
// (which typically means that the thread pool has already been shut down).
virtual bool offer(Task task) { return _work_queue.blocking_put(task); }
virtual bool offer(WorkFunction func) {
WorkThreadPool::Task task = {0, func};
return _work_queue.blocking_put(task);
}
virtual bool try_offer(WorkFunction func) {
WorkThreadPool::Task task = {0, func};
return _work_queue.try_put(task);
}
// Shuts the thread pool down, causing the work queue to cease accepting offered work
// and the worker threads to terminate once they have processed their current work item.
// Returns once the shutdown flag has been set, does not wait for the threads to
// terminate.
virtual void shutdown() {
_shutdown = true;
_work_queue.shutdown();
}
// Blocks until all threads are finished. shutdown does not need to have been called,
// since it may be called on a separate thread.
virtual void join() { static_cast<void>(_threads.join_all()); }
virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
virtual uint32_t get_active_threads() const { return _active_threads; }
// Blocks until the work queue is empty, and then calls shutdown to stop the worker
// threads and Join to wait until they are finished.
// Any work Offer()'ed during DrainAndshutdown may or may not be processed.
virtual void drain_and_shutdown() {
{
std::unique_lock l(_lock);
while (_work_queue.get_size() != 0) {
_empty_cv.wait(l);
}
}
shutdown();
join();
}
std::string get_info() const {
return (Priority ? "PriorityThreadPool" : "FifoThreadPool") +
fmt::format(
"(name={}, queue_size={}/{}, active_thread={}/{}, "
"total_get_wait_time={}, total_put_wait_time={})",
_name, get_queue_size(), _work_queue.get_capacity(), _active_threads,
_threads.size(), _work_queue.total_get_wait_time(),
_work_queue.total_put_wait_time());
}
protected:
virtual bool is_shutdown() { return _shutdown; }
// Collection of worker threads that process work from the queue.
ThreadGroup _threads;
// Guards _empty_cv
std::mutex _lock;
// Signalled when the queue becomes empty
std::condition_variable _empty_cv;
private:
// Driver method for each thread in the pool. Continues to read work from the queue
// until the pool is shutdown.
void work_thread(int thread_id) {
Thread::set_self_name(_name);
while (!is_shutdown()) {
Task task;
if (_work_queue.blocking_get(&task)) {
_active_threads++;
task.work_function();
_active_threads--;
}
if (_work_queue.get_size() == 0) {
_empty_cv.notify_all();
}
}
}
WorkQueue _work_queue;
// Set to true when threads should stop doing work and terminate.
std::atomic<bool> _shutdown;
std::string _name;
std::atomic<int> _active_threads;
};
using PriorityThreadPool = WorkThreadPool<true>;
using FifoThreadPool = WorkThreadPool<false>;
} // namespace doris