blob: 5bbd3f63d3a93d18b6b757f0862fcc9faaf91a73 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_THREAD_POOL_H
#define LIBMINIFI_INCLUDE_THREAD_POOL_H
#include <chrono>
#include <sstream>
#include <iostream>
#include <atomic>
#include <mutex>
#include <map>
#include <vector>
#include <queue>
#include <future>
#include <thread>
#include <functional>
#include "BackTrace.h"
#include "core/expect.h"
#include "controllers/ThreadManagementService.h"
#include "concurrentqueue.h"
#include "core/controller/ControllerService.h"
#include "core/controller/ControllerServiceProvider.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace utils {
/**
* Worker task helper that determines
* whether or not we will run
*/
template<typename T>
class AfterExecute {
public:
virtual ~AfterExecute() {
}
explicit AfterExecute() {
}
explicit AfterExecute(AfterExecute &&other) {
}
virtual bool isFinished(const T &result) = 0;
virtual bool isCancelled(const T &result) = 0;
/**
* Time to wait before re-running this task if necessary
* @return milliseconds since epoch after which we are eligible to re-run this task.
*/
virtual int64_t wait_time() = 0;
};
/**
* Worker task
* purpose: Provides a wrapper for the functor
* and returns a future based on the template argument.
*/
template<typename T>
class Worker {
public:
explicit Worker(std::function<T()> &task, const std::string &identifier, std::unique_ptr<AfterExecute<T>> run_determinant)
: identifier_(identifier),
time_slice_(0),
task(task),
run_determinant_(std::move(run_determinant)) {
promise = std::make_shared<std::promise<T>>();
}
explicit Worker(std::function<T()> &task, const std::string &identifier)
: identifier_(identifier),
time_slice_(0),
task(task),
run_determinant_(nullptr) {
promise = std::make_shared<std::promise<T>>();
}
explicit Worker(const std::string identifier = "")
: identifier_(identifier),
time_slice_(0) {
}
virtual ~Worker() {
}
/**
* Move constructor for worker tasks
*/
Worker(Worker &&other)
: identifier_(std::move(other.identifier_)),
time_slice_(std::move(other.time_slice_)),
task(std::move(other.task)),
run_determinant_(std::move(other.run_determinant_)),
promise(other.promise) {
}
/**
* Runs the task and takes the output from the functor
* setting the result into the promise
* @return whether or not to continue running
* false == finished || error
* true == run again
*/
virtual bool run() {
T result = task();
if (run_determinant_ == nullptr || (run_determinant_->isFinished(result) || run_determinant_->isCancelled(result))) {
promise->set_value(result);
return false;
}
time_slice_ = increment_time(run_determinant_->wait_time());
return true;
}
virtual void setIdentifier(const std::string identifier) {
identifier_ = identifier;
}
virtual uint64_t getTimeSlice() {
return time_slice_;
}
virtual uint64_t getWaitTime() {
return run_determinant_->wait_time();
}
Worker<T>(const Worker<T>&) = delete;
Worker<T>& operator =(const Worker<T>&) = delete;
Worker<T>& operator =(Worker<T> &&);
std::shared_ptr<std::promise<T>> getPromise();
const std::string &getIdentifier() {
return identifier_;
}
protected:
inline uint64_t increment_time(const uint64_t &time) {
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
return millis + time;
}
std::string identifier_;
uint64_t time_slice_;
std::function<T()> task;
std::unique_ptr<AfterExecute<T>> run_determinant_;
std::shared_ptr<std::promise<T>> promise;
};
template<typename T>
class WorkerComparator {
public:
bool operator()(Worker<T> &a, Worker<T> &b) {
return a.getTimeSlice() < b.getTimeSlice();
}
};
template<typename T>
Worker<T>& Worker<T>::operator =(Worker<T> && other) {
task = std::move(other.task);
promise = other.promise;
time_slice_ = std::move(other.time_slice_);
identifier_ = std::move(other.identifier_);
run_determinant_ = std::move(other.run_determinant_);
return *this;
}
template<typename T>
std::shared_ptr<std::promise<T>> Worker<T>::getPromise() {
return promise;
}
class WorkerThread {
public:
explicit WorkerThread(std::thread thread, const std::string &name = "NamelessWorker")
: is_running_(false),
thread_(std::move(thread)),
name_(name) {
}
WorkerThread(const std::string &name = "NamelessWorker")
: is_running_(false),
name_(name) {
}
std::atomic<bool> is_running_;
std::thread thread_;
std::string name_;
};
/**
* Thread pool
* Purpose: Provides a thread pool with basic functionality similar to
* ThreadPoolExecutor
* Design: Locked control over a manager thread that controls the worker threads
*/
template<typename T>
class ThreadPool {
public:
ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, const std::shared_ptr<core::controller::ControllerServiceProvider> &controller_service_provider = nullptr,
const std::string &name = "NamelessPool")
: daemon_threads_(daemon_threads),
thread_reduction_count_(0),
max_worker_threads_(max_worker_threads),
adjust_threads_(false),
running_(false),
controller_service_provider_(controller_service_provider),
name_(name) {
current_workers_ = 0;
task_count_ = 0;
thread_manager_ = nullptr;
}
ThreadPool(const ThreadPool<T> &&other)
: daemon_threads_(std::move(other.daemon_threads_)),
thread_reduction_count_(0),
max_worker_threads_(std::move(other.max_worker_threads_)),
adjust_threads_(false),
running_(false),
controller_service_provider_(std::move(other.controller_service_provider_)),
thread_manager_(std::move(other.thread_manager_)),
name_(std::move(other.name_)) {
current_workers_ = 0;
task_count_ = 0;
}
~ThreadPool() {
shutdown();
}
/**
* Execute accepts a worker task and returns
* a future
* @param task this thread pool will subsume ownership of
* the worker task
* @param future future to move new promise to
* @return true if future can be created and thread pool is in a running state.
*/
bool execute(Worker<T> &&task, std::future<T> &future);
/**
* attempts to stop tasks with the provided identifier.
* @param identifier for worker tasks. Note that these tasks won't
* immediately stop.
*/
void stopTasks(const std::string &identifier);
/**
* Returns true if a task is running.
*/
bool isRunning(const std::string &identifier) {
return task_status_[identifier] == true;
}
std::vector<BackTrace> getTraces() {
std::vector<BackTrace> traces;
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
std::unique_lock<std::mutex> wlock(worker_queue_mutex_);
// while we may be checking if running, we don't want to
// use the threads outside of the manager mutex's lock -- therefore we will
// obtain a lock so we can keep the threads in memory
if (running_) {
for (const auto &worker : thread_queue_) {
if (worker->is_running_)
traces.emplace_back(TraceResolver::getResolver().getBackTrace(worker->name_, worker->thread_.native_handle()));
}
}
return traces;
}
/**
* Starts the Thread Pool
*/
void start();
/**
* Shutdown the thread pool and clear any
* currently running activities
*/
void shutdown();
/**
* Set the max concurrent tasks. When this is done
* we must start and restart the thread pool if
* the number of tasks is less than the currently configured number
*/
void setMaxConcurrentTasks(uint16_t max) {
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
if (running_) {
shutdown();
}
max_worker_threads_ = max;
if (!running_)
start();
}
ThreadPool<T> operator=(const ThreadPool<T> &other) = delete;
ThreadPool(const ThreadPool<T> &other) = delete;
ThreadPool<T> &operator=(ThreadPool<T> &&other) {
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
if (other.running_) {
other.shutdown();
}
if (running_) {
shutdown();
}
max_worker_threads_ = std::move(other.max_worker_threads_);
daemon_threads_ = std::move(other.daemon_threads_);
current_workers_ = 0;
thread_reduction_count_ = 0;
thread_queue_ = std::move(other.thread_queue_);
worker_queue_ = std::move(other.worker_queue_);
controller_service_provider_ = std::move(other.controller_service_provider_);
thread_manager_ = std::move(other.thread_manager_);
adjust_threads_ = false;
if (!running_) {
start();
}
name_ = other.name_;
return *this;
}
protected:
std::thread createThread(std::function<void()> &&functor) {
return std::thread([ functor ]() mutable {
functor();
});
}
/**
* Drain will notify tasks to stop following notification
*/
void drain() {
while (current_workers_ > 0) {
tasks_available_.notify_one();
}
}
// determines if threads are detached
bool daemon_threads_;
std::atomic<int> thread_reduction_count_;
// max worker threads
int max_worker_threads_;
// current worker tasks.
std::atomic<int> current_workers_;
std::atomic<int> task_count_;
// thread queue
std::vector<std::shared_ptr<WorkerThread>> thread_queue_;
// manager thread
std::thread manager_thread_;
// conditional that's used to adjust the threads
std::atomic<bool> adjust_threads_;
// atomic running boolean
std::atomic<bool> running_;
// controller service provider
std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_;
// integrated power manager
std::shared_ptr<controllers::ThreadManagementService> thread_manager_;
// thread queue for the recently deceased threads.
moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
// worker queue of worker objects
moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
std::priority_queue<Worker<T>, std::vector<Worker<T>>, WorkerComparator<T>> worker_priority_queue_;
// notification for available work
std::condition_variable tasks_available_;
// map to identify if a task should be
std::map<std::string, bool> task_status_;
// manager mutex
std::recursive_mutex manager_mutex_;
// work queue mutex
std::mutex worker_queue_mutex_;
// thread pool name
std::string name_;
/**
* Call for the manager to start worker threads
*/
void manageWorkers();
/**
* Function to adjust the workers up and down.
*/
void adjustWorkers(int count);
/**
* Runs worker tasks
*/
void run_tasks(std::shared_ptr<WorkerThread> thread);
};
template<typename T>
bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
{
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
task_status_[task.getIdentifier()] = true;
}
future = std::move(task.getPromise()->get_future());
bool enqueued = worker_queue_.enqueue(std::move(task));
if (running_) {
tasks_available_.notify_one();
}
task_count_++;
return enqueued;
}
template<typename T>
void ThreadPool<T>::manageWorkers() {
for (int i = 0; i < max_worker_threads_; i++) {
std::stringstream thread_name;
thread_name << name_ << " #" << i;
auto worker_thread = std::make_shared<WorkerThread>(thread_name.str());
worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
thread_queue_.push_back(worker_thread);
current_workers_++;
}
if (daemon_threads_) {
for (auto &thread : thread_queue_) {
thread->thread_.detach();
}
}
// likely don't have a thread manager
if (LIKELY(nullptr != thread_manager_)) {
while (running_) {
auto waitperiod = std::chrono::milliseconds(1) * 500;
{
if (thread_manager_->isAboveMax(current_workers_)) {
auto max = thread_manager_->getMaxConcurrentTasks();
auto differential = current_workers_ - max;
thread_reduction_count_ += differential;
} else if (thread_manager_->shouldReduce()) {
if (current_workers_ > 1)
thread_reduction_count_++;
thread_manager_->reduce();
} else if (thread_manager_->canIncrease() && max_worker_threads_ - current_workers_ > 0) { // increase slowly
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
auto worker_thread = std::make_shared<WorkerThread>();
worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
if (daemon_threads_) {
worker_thread->thread_.detach();
}
thread_queue_.push_back(worker_thread);
current_workers_++;
}
}
{
std::shared_ptr<WorkerThread> thread_ref;
while (deceased_thread_queue_.try_dequeue(thread_ref)) {
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
if (thread_ref->thread_.joinable())
thread_ref->thread_.join();
thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end());
}
}
std::this_thread::sleep_for(waitperiod);
}
} else {
for (auto &thread : thread_queue_) {
if (thread->thread_.joinable())
thread->thread_.join();
}
}
}
template<typename T>
void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) {
auto waitperiod = std::chrono::milliseconds(1) * 100;
thread->is_running_ = true;
uint64_t wait_decay_ = 0;
uint64_t yield_backoff = 10; // start at 10 ms
while (running_.load()) {
if (UNLIKELY(thread_reduction_count_ > 0)) {
if (--thread_reduction_count_ >= 0) {
deceased_thread_queue_.enqueue(thread);
thread->is_running_ = false;
break;
} else {
thread_reduction_count_++;
}
}
// if we exceed 500ms of wait due to not being able to run any tasks and there are tasks available, meaning
// they are eligible to run per the fact that the thread pool isn't shut down and the tasks are in a runnable state
// BUT they've been continually timesliced, we will lower the wait decay to 100ms and continue incrementing from
// there. This ensures we don't have arbitrarily long sleep cycles.
if (wait_decay_ > 500000000L) {
wait_decay_ = 100000000L;
}
// if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible
// we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially
// wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should
// be more likely to run. This is intentional.
if (wait_decay_ > 2000) {
std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
}
if (current_workers_ < max_worker_threads_) {
// we are in a reduced state. due to thread management
// let's institute a backoff up to 500ms
if (yield_backoff < 500) {
yield_backoff += 10;
}
std::this_thread::sleep_for(std::chrono::milliseconds(yield_backoff));
} else {
yield_backoff = 10;
}
Worker<T> task;
bool prioritized_task = false;
if (!prioritized_task) {
if (!worker_queue_.try_dequeue(task)) {
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
if (worker_priority_queue_.size() > 0) {
// this is safe as we are going to immediately pop the queue
while (!worker_priority_queue_.empty()) {
task = std::move(const_cast<Worker<T>&>(worker_priority_queue_.top()));
worker_priority_queue_.pop();
worker_queue_.enqueue(std::move(task));
continue;
}
}
tasks_available_.wait_for(lock, waitperiod);
continue;
} else {
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
if (!task_status_[task.getIdentifier()]) {
continue;
}
}
bool wait_to_run = false;
if (task.getTimeSlice() > 1) {
double wt = (double) task.getWaitTime();
auto now = std::chrono::system_clock::now().time_since_epoch();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
// if our differential is < 10% of the wait time we will not put the task into a wait state
// since requeuing will break the time slice contract.
if ((double) task.getTimeSlice() > ms && ((double) (task.getTimeSlice() - ms)) > (wt * .10)) {
wait_to_run = true;
}
}
// if we have to wait we re-queue the worker.
if (wait_to_run) {
{
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
if (!task_status_[task.getIdentifier()]) {
continue;
}
// put it on the priority queue
worker_priority_queue_.push(std::move(task));
}
wait_decay_ += 25;
continue;
}
}
const bool task_renew = task.run();
wait_decay_ = 0;
if (task_renew) {
if (UNLIKELY(task_count_ > current_workers_)) {
// even if we have more work to do we will not
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
if (!task_status_[task.getIdentifier()]) {
continue;
}
worker_priority_queue_.push(std::move(task));
} else {
worker_queue_.enqueue(std::move(task));
}
}
}
current_workers_--;
}
template<typename T>
void ThreadPool<T>::start() {
if (nullptr != controller_service_provider_) {
auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager");
thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr;
} else {
thread_manager_ = nullptr;
}
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
if (!running_) {
running_ = true;
manager_thread_ = std::move(std::thread(&ThreadPool::manageWorkers, this));
if (worker_queue_.size_approx() > 0) {
tasks_available_.notify_all();
}
}
}
template<typename T>
void ThreadPool<T>::stopTasks(const std::string &identifier) {
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
task_status_[identifier] = false;
}
template<typename T>
void ThreadPool<T>::shutdown() {
if (running_.load()) {
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
running_.store(false);
drain();
task_status_.clear();
if (manager_thread_.joinable())
manager_thread_.join();
{
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
for(const auto &thread : thread_queue_){
if (thread->thread_.joinable())
thread->thread_.join();
}
thread_queue_.clear();
current_workers_ = 0;
while (worker_queue_.size_approx() > 0) {
Worker<T> task;
worker_queue_.try_dequeue(task);
}
}
}
}
} /* namespace utils */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
#endif