MINIFICPP-378: Detach new threads if daemon threads is true
Resolve issue with starving single tasks on reduced thread pool
This closes #247.
Signed-off-by: Aldrin Piri <aldrin@apache.org>
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index f04e319..9fc47f5 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -220,6 +220,7 @@
running_(false),
controller_service_provider_(controller_service_provider) {
current_workers_ = 0;
+ task_count_ = 0;
thread_manager_ = nullptr;
}
@@ -232,6 +233,7 @@
controller_service_provider_(std::move(other.controller_service_provider_)),
thread_manager_(std::move(other.thread_manager_)) {
current_workers_ = 0;
+ task_count_ = 0;
}
~ThreadPool() {
@@ -339,6 +341,7 @@
int max_worker_threads_;
// current worker tasks.
std::atomic<int> current_workers_;
+ std::atomic<int> task_count_;
// thread queue
std::vector<std::shared_ptr<WorkerThread>> thread_queue_;
// manager thread
@@ -383,7 +386,6 @@
template<typename T>
bool ThreadPool<T>::execute(Worker<T> &&task, std::future<T> &future) {
-
{
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
task_status_[task.getIdentifier()] = true;
@@ -393,6 +395,9 @@
if (running_) {
tasks_available_.notify_one();
}
+
+ task_count_++;
+
return enqueued;
}
@@ -428,6 +433,9 @@
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
auto worker_thread = std::make_shared<WorkerThread>();
worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread));
+ if (daemon_threads_) {
+ worker_thread->thread_.detach();
+ }
thread_queue_.push_back(worker_thread);
current_workers_++;
}
@@ -456,7 +464,6 @@
uint64_t wait_decay_ = 0;
uint64_t yield_backoff = 10; // start at 10 ms
while (running_.load()) {
-
if (UNLIKELY(thread_reduction_count_ > 0)) {
if (--thread_reduction_count_ >= 0) {
deceased_thread_queue_.enqueue(thread);
@@ -493,66 +500,73 @@
yield_backoff = 10;
}
Worker<T> task;
- if (!worker_queue_.try_dequeue(task)) {
- std::unique_lock<std::mutex> lock(worker_queue_mutex_);
- if (worker_priority_queue_.size() > 0) {
- // this is safe as we are going to immediately pop the queue
- while (!worker_priority_queue_.empty()) {
- task = std::move(const_cast<Worker<T>&>(worker_priority_queue_.top()));
- worker_priority_queue_.pop();
- worker_queue_.enqueue(std::move(task));
- continue;
+
+ bool prioritized_task = false;
+
+ if (!prioritized_task) {
+ if (!worker_queue_.try_dequeue(task)) {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (worker_priority_queue_.size() > 0) {
+ // this is safe as we are going to immediately pop the queue
+ while (!worker_priority_queue_.empty()) {
+ task = std::move(const_cast<Worker<T>&>(worker_priority_queue_.top()));
+ worker_priority_queue_.pop();
+ worker_queue_.enqueue(std::move(task));
+ continue;
+ }
+
}
-
- }
- tasks_available_.wait_for(lock, waitperiod);
- continue;
- } else {
- std::unique_lock<std::mutex> lock(worker_queue_mutex_);
- if (!task_status_[task.getIdentifier()]) {
+ tasks_available_.wait_for(lock, waitperiod);
continue;
- }
- }
-
- bool wait_to_run = false;
- if (task.getTimeSlice() > 1) {
- double wt = (double) task.getWaitTime();
- auto now = std::chrono::system_clock::now().time_since_epoch();
- auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
- // if our differential is < 10% of the wait time we will not put the task into a wait state
- // since requeuing will break the time slice contract.
- if ((double) task.getTimeSlice() > ms && ((double) (task.getTimeSlice() - ms)) > (wt * .10)) {
- wait_to_run = true;
- }
- }
- // if we have to wait we re-queue the worker.
- if (wait_to_run) {
- {
+ } else {
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
if (!task_status_[task.getIdentifier()]) {
continue;
}
- // put it on the priority queue
- worker_priority_queue_.push(std::move(task));
}
- //worker_queue_.enqueue(std::move(task));
- wait_decay_ += 25;
- continue;
+ bool wait_to_run = false;
+ if (task.getTimeSlice() > 1) {
+ double wt = (double) task.getWaitTime();
+ auto now = std::chrono::system_clock::now().time_since_epoch();
+ auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
+
+ // if our differential is < 10% of the wait time we will not put the task into a wait state
+ // since requeuing will break the time slice contract.
+ if ((double) task.getTimeSlice() > ms && ((double) (task.getTimeSlice() - ms)) > (wt * .10)) {
+ wait_to_run = true;
+ }
+ }
+ // if we have to wait we re-queue the worker.
+ if (wait_to_run) {
+ {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (!task_status_[task.getIdentifier()]) {
+ continue;
+ }
+ // put it on the priority queue
+ worker_priority_queue_.push(std::move(task));
+ }
+
+ wait_decay_ += 25;
+ continue;
+ }
}
-
const bool task_renew = task.run();
wait_decay_ = 0;
if (task_renew) {
- {
+ if (UNLIKELY(task_count_ > current_workers_)) {
// even if we have more work to do we will not
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
if (!task_status_[task.getIdentifier()]) {
continue;
}
+
+ worker_priority_queue_.push(std::move(task));
+ } else {
+ worker_queue_.enqueue(std::move(task));
}
- worker_queue_.enqueue(std::move(task));
}
}
current_workers_--;