blob: 2554dc24e2bd1b40c46b2d0519a0ca127f44bbaf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_THREAD_POOL_H
#define LIBMINIFI_INCLUDE_THREAD_POOL_H
#include <chrono>
#include <sstream>
#include <iostream>
#include <atomic>
#include <mutex>
#include <map>
#include <vector>
#include <queue>
#include <future>
#include <thread>
#include <functional>
#include "BackTrace.h"
#include "Monitors.h"
#include "core/expect.h"
#include "controllers/ThreadManagementService.h"
#include "concurrentqueue.h"
#include "core/controller/ControllerService.h"
#include "core/controller/ControllerServiceProvider.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace utils {
/**
* Worker task
* purpose: Provides a wrapper for the functor
* and returns a future based on the template argument.
*/
template<typename T>
class Worker {
public:
explicit Worker(std::function<T()> &task, const std::string &identifier, std::unique_ptr<AfterExecute<T>> run_determinant)
: identifier_(identifier),
next_exec_time_(std::chrono::steady_clock::now()),
task(task),
run_determinant_(std::move(run_determinant)) {
promise = std::make_shared<std::promise<T>>();
}
explicit Worker(std::function<T()> &task, const std::string &identifier)
: identifier_(identifier),
next_exec_time_(std::chrono::steady_clock::now()),
task(task),
run_determinant_(nullptr) {
promise = std::make_shared<std::promise<T>>();
}
explicit Worker(const std::string identifier = "")
: identifier_(identifier),
next_exec_time_(std::chrono::steady_clock::now()) {
}
virtual ~Worker() {
}
/**
* Move constructor for worker tasks
*/
Worker (Worker &&other) noexcept
: identifier_(std::move(other.identifier_)),
next_exec_time_(std::move(other.next_exec_time_)),
task(std::move(other.task)),
run_determinant_(std::move(other.run_determinant_)),
promise(other.promise) {
}
/**
* Runs the task and takes the output from the functor
* setting the result into the promise
* @return whether or not to continue running
* false == finished || error
* true == run again
*/
virtual bool run() {
T result = task();
if (run_determinant_ == nullptr || (run_determinant_->isFinished(result) || run_determinant_->isCancelled(result))) {
promise->set_value(result);
return false;
}
next_exec_time_ += run_determinant_->wait_time();
return true;
}
virtual void setIdentifier(const std::string identifier) {
identifier_ = identifier;
}
virtual std::chrono::time_point<std::chrono::steady_clock> getNextExecutionTime() const {
return next_exec_time_;
}
virtual std::chrono::milliseconds getWaitTime() const {
return run_determinant_->wait_time();
}
Worker<T>(const Worker<T>&) = delete;
Worker<T>& operator= (const Worker<T>&) = delete;
Worker<T>& operator= (Worker<T> &&) noexcept;
std::shared_ptr<std::promise<T>> getPromise() const;
const std::string &getIdentifier() const {
return identifier_;
}
protected:
std::string identifier_;
std::chrono::time_point<std::chrono::steady_clock> next_exec_time_;
std::function<T()> task;
std::unique_ptr<AfterExecute<T>> run_determinant_;
std::shared_ptr<std::promise<T>> promise;
};
template<typename T>
class DelayedTaskComparator {
public:
bool operator()(Worker<T> &a, Worker<T> &b) {
return a.getNextExecutionTime() > b.getNextExecutionTime();
}
};
template<typename T>
Worker<T>& Worker<T>::operator =(Worker<T> && other) noexcept {
task = std::move(other.task);
promise = other.promise;
next_exec_time_ = std::move(other.next_exec_time_);
identifier_ = std::move(other.identifier_);
run_determinant_ = std::move(other.run_determinant_);
return *this;
}
template<typename T>
std::shared_ptr<std::promise<T>> Worker<T>::getPromise() const {
return promise;
}
class WorkerThread {
public:
explicit WorkerThread(std::thread thread, const std::string &name = "NamelessWorker")
: is_running_(false),
thread_(std::move(thread)),
name_(name) {
}
WorkerThread(const std::string &name = "NamelessWorker")
: is_running_(false),
name_(name) {
}
std::atomic<bool> is_running_;
std::thread thread_;
std::string name_;
};
/**
* Thread pool
* Purpose: Provides a thread pool with basic functionality similar to
* ThreadPoolExecutor
* Design: Locked control over a manager thread that controls the worker threads
*/
template<typename T>
class ThreadPool {
public:
ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, const std::shared_ptr<core::controller::ControllerServiceProvider> &controller_service_provider = nullptr,
const std::string &name = "NamelessPool")
: daemon_threads_(daemon_threads),
thread_reduction_count_(0),
max_worker_threads_(max_worker_threads),
adjust_threads_(false),
running_(false),
controller_service_provider_(controller_service_provider),
name_(name) {
current_workers_ = 0;
task_count_ = 0;
thread_manager_ = nullptr;
}
ThreadPool(const ThreadPool<T> &other) = delete;
ThreadPool<T>& operator=(const ThreadPool<T> &other) = delete;
ThreadPool(ThreadPool<T> &&other) = delete;
ThreadPool<T>& operator=(ThreadPool<T> &&other) = delete;
~ThreadPool() {
shutdown();
}
/**
* Execute accepts a worker task and returns
* a future
* @param task this thread pool will subsume ownership of
* the worker task
* @param future future to move new promise to
* @return true if future can be created and thread pool is in a running state.
*/
bool execute(Worker<T> &&task, std::future<T> &future);
/**
* attempts to stop tasks with the provided identifier.
* @param identifier for worker tasks. Note that these tasks won't
* immediately stop.
*/
void stopTasks(const std::string &identifier);
/**
* Returns true if a task is running.
*/
bool isTaskRunning(const std::string &identifier) const {
try {
return task_status_.at(identifier) == true;
} catch (const std::out_of_range &e) {
return false;
}
}
bool isRunning() const {
return running_.load();
}
std::vector<BackTrace> getTraces() {
std::vector<BackTrace> traces;
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
std::unique_lock<std::mutex> wlock(worker_queue_mutex_);
// while we may be checking if running, we don't want to
// use the threads outside of the manager mutex's lock -- therefore we will
// obtain a lock so we can keep the threads in memory
if (running_) {
for (const auto &worker : thread_queue_) {
if (worker->is_running_)
traces.emplace_back(TraceResolver::getResolver().getBackTrace(worker->name_, worker->thread_.native_handle()));
}
}
return traces;
}
/**
* Starts the Thread Pool
*/
void start();
/**
* Shutdown the thread pool and clear any
* currently running activities
*/
void shutdown();
/**
* Set the max concurrent tasks. When this is done
* we must start and restart the thread pool if
* the number of tasks is less than the currently configured number
*/
void setMaxConcurrentTasks(uint16_t max) {
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
bool was_running = running_;
if (was_running) {
shutdown();
}
max_worker_threads_ = max;
if (was_running)
start();
}
void setControllerServiceProvider(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider) {
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
bool was_running = running_;
if (was_running) {
shutdown();
}
controller_service_provider_ = controller_service_provider;
if (was_running)
start();
}
protected:
std::thread createThread(std::function<void()> &&functor) {
return std::thread([ functor ]() mutable {
functor();
});
}
/**
* Drain will notify tasks to stop following notification
*/
void drain() {
while (current_workers_ > 0) {
tasks_available_.notify_one();
}
}
// determines if threads are detached
bool daemon_threads_;
std::atomic<int> thread_reduction_count_;
// max worker threads
int max_worker_threads_;
// current worker tasks.
std::atomic<int> current_workers_;
std::atomic<int> task_count_;
// thread queue
std::vector<std::shared_ptr<WorkerThread>> thread_queue_;
// manager thread
std::thread manager_thread_;
// the thread responsible for putting delayed tasks to the worker queue when they had to be put
std::thread delayed_scheduler_thread_;
// conditional that's used to adjust the threads
std::atomic<bool> adjust_threads_;
// atomic running boolean
std::atomic<bool> running_;
// controller service provider
std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_;
// integrated power manager
std::shared_ptr<controllers::ThreadManagementService> thread_manager_;
// thread queue for the recently deceased threads.
moodycamel::ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
// worker queue of worker objects
moodycamel::ConcurrentQueue<Worker<T>> worker_queue_;
std::priority_queue<Worker<T>, std::vector<Worker<T>>, DelayedTaskComparator<T>> delayed_worker_queue_;
// notification for available work
std::condition_variable tasks_available_;
// notification for new delayed tasks that's before the current ones
std::condition_variable delayed_task_available_;
// map to identify if a task should be
std::map<std::string, bool> task_status_;
// manager mutex
std::recursive_mutex manager_mutex_;
// work queue mutex
std::mutex worker_queue_mutex_;
// thread pool name
std::string name_;
/**
* Call for the manager to start worker threads
*/
void manageWorkers();
/**
* Function to adjust the workers up and down.
*/
void adjustWorkers(int count);
/**
* Runs worker tasks
*/
void run_tasks(std::shared_ptr<WorkerThread> thread);
void manage_delayed_queue();
};
} /* namespace utils */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
#endif