blob: e917f4a478b8ebdf0d5897ef6f285e6af1c8daf8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <thrift/thrift-config.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/Exception.h>
#include <thrift/concurrency/Monitor.h>
#include <memory>
#include <stdexcept>
#include <deque>
#include <set>
namespace apache {
namespace thrift {
namespace concurrency {
using std::shared_ptr;
using std::unique_ptr;
using std::dynamic_pointer_cast;
/**
* ThreadManager class
*
* This class manages a pool of threads. It uses a ThreadFactory to create
* threads. It never actually creates or destroys worker threads, rather
* it maintains statistics on number of idle threads, number of active threads,
* task backlog, and average wait and service times.
*
* There are three different monitors used for signaling different conditions
* however they all share the same mutex_.
*
* @version $Id:$
*/
class ThreadManager::Impl : public ThreadManager {
public:
Impl()
: workerCount_(0),
workerMaxCount_(0),
idleCount_(0),
pendingTaskCountMax_(0),
expiredCount_(0),
state_(ThreadManager::UNINITIALIZED),
monitor_(&mutex_),
maxMonitor_(&mutex_),
workerMonitor_(&mutex_) {}
~Impl() override { stop(); }
void start() override;
void stop() override;
ThreadManager::STATE state() const override { return state_; }
shared_ptr<ThreadFactory> threadFactory() const override {
Guard g(mutex_);
return threadFactory_;
}
void threadFactory(shared_ptr<ThreadFactory> value) override {
Guard g(mutex_);
if (threadFactory_ && threadFactory_->isDetached() != value->isDetached()) {
throw InvalidArgumentException();
}
threadFactory_ = value;
}
void addWorker(size_t value) override;
void removeWorker(size_t value) override;
size_t idleWorkerCount() const override { return idleCount_; }
size_t workerCount() const override {
Guard g(mutex_);
return workerCount_;
}
size_t pendingTaskCount() const override {
Guard g(mutex_);
return tasks_.size();
}
size_t totalTaskCount() const override {
Guard g(mutex_);
return tasks_.size() + workerCount_ - idleCount_;
}
size_t pendingTaskCountMax() const override {
Guard g(mutex_);
return pendingTaskCountMax_;
}
size_t expiredTaskCount() const override {
Guard g(mutex_);
return expiredCount_;
}
void pendingTaskCountMax(const size_t value) {
Guard g(mutex_);
pendingTaskCountMax_ = value;
}
void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) override;
void remove(shared_ptr<Runnable> task) override;
shared_ptr<Runnable> removeNextPending() override;
void removeExpiredTasks() override {
removeExpired(false);
}
void setExpireCallback(ExpireCallback expireCallback) override;
private:
/**
* Remove one or more expired tasks.
* \param[in] justOne if true, try to remove just one task and return
*/
void removeExpired(bool justOne);
/**
* \returns whether it is acceptable to block, depending on the current thread id
*/
bool canSleep() const;
/**
* Lowers the maximum worker count and blocks until enough worker threads complete
* to get to the new maximum worker limit. The caller is responsible for acquiring
* a lock on the class mutex_.
*/
void removeWorkersUnderLock(size_t value);
size_t workerCount_;
size_t workerMaxCount_;
size_t idleCount_;
size_t pendingTaskCountMax_;
size_t expiredCount_;
ExpireCallback expireCallback_;
ThreadManager::STATE state_;
shared_ptr<ThreadFactory> threadFactory_;
friend class ThreadManager::Task;
typedef std::deque<shared_ptr<Task> > TaskQueue;
TaskQueue tasks_;
Mutex mutex_;
Monitor monitor_;
Monitor maxMonitor_;
Monitor workerMonitor_; // used to synchronize changes in worker count
friend class ThreadManager::Worker;
std::set<shared_ptr<Thread> > workers_;
std::set<shared_ptr<Thread> > deadWorkers_;
std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
};
class ThreadManager::Task : public Runnable {
public:
enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE };
Task(shared_ptr<Runnable> runnable, uint64_t expiration = 0ULL)
: runnable_(runnable),
state_(WAITING) {
if (expiration != 0ULL) {
expireTime_.reset(new std::chrono::steady_clock::time_point(std::chrono::steady_clock::now() + std::chrono::milliseconds(expiration)));
}
}
~Task() override = default;
void run() override {
if (state_ == EXECUTING) {
runnable_->run();
state_ = COMPLETE;
}
}
shared_ptr<Runnable> getRunnable() { return runnable_; }
const unique_ptr<std::chrono::steady_clock::time_point> & getExpireTime() const { return expireTime_; }
private:
shared_ptr<Runnable> runnable_;
friend class ThreadManager::Worker;
STATE state_;
unique_ptr<std::chrono::steady_clock::time_point> expireTime_;
};
class ThreadManager::Worker : public Runnable {
enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
public:
Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED) {}
~Worker() override = default;
private:
bool isActive() const {
return (manager_->workerCount_ <= manager_->workerMaxCount_)
|| (manager_->state_ == JOINING && !manager_->tasks_.empty());
}
public:
/**
* Worker entry point
*
* As long as worker thread is running, pull tasks off the task queue and
* execute.
*/
void run() override {
Guard g(manager_->mutex_);
/**
* This method has three parts; one is to check for and account for
* admitting a task which happens under a lock. Then the lock is released
* and the task itself is executed. Finally we do some accounting
* under lock again when the task completes.
*/
/**
* Admitting
*/
/**
* Increment worker semaphore and notify manager if worker count reached
* desired max
*/
bool active = manager_->workerCount_ < manager_->workerMaxCount_;
if (active) {
if (++manager_->workerCount_ == manager_->workerMaxCount_) {
manager_->workerMonitor_.notify();
}
}
while (active) {
/**
* While holding manager monitor block for non-empty task queue (Also
* check that the thread hasn't been requested to stop). Once the queue
* is non-empty, dequeue a task, release monitor, and execute. If the
* worker max count has been decremented such that we exceed it, mark
* ourself inactive, decrement the worker count and notify the manager
* (technically we're notifying the next blocked thread but eventually
* the manager will see it.
*/
active = isActive();
while (active && manager_->tasks_.empty()) {
manager_->idleCount_++;
manager_->monitor_.wait();
active = isActive();
manager_->idleCount_--;
}
shared_ptr<ThreadManager::Task> task;
if (active) {
if (!manager_->tasks_.empty()) {
task = manager_->tasks_.front();
manager_->tasks_.pop_front();
if (task->state_ == ThreadManager::Task::WAITING) {
// If the state is changed to anything other than EXECUTING or TIMEDOUT here
// then the execution loop needs to be changed below.
task->state_ =
(task->getExpireTime() && *(task->getExpireTime()) < std::chrono::steady_clock::now()) ?
ThreadManager::Task::TIMEDOUT :
ThreadManager::Task::EXECUTING;
}
}
/* If we have a pending task max and we just dropped below it, wakeup any
thread that might be blocked on add. */
if (manager_->pendingTaskCountMax_ != 0
&& manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
manager_->maxMonitor_.notify();
}
}
/**
* Execution - not holding a lock
*/
if (task) {
if (task->state_ == ThreadManager::Task::EXECUTING) {
// Release the lock so we can run the task without blocking the thread manager
manager_->mutex_.unlock();
try {
task->run();
} catch (const std::exception& e) {
GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());
} catch (...) {
GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
}
// Re-acquire the lock to proceed in the thread manager
manager_->mutex_.lock();
} else if (manager_->expireCallback_) {
// The only other state the task could have been in is TIMEDOUT (see above)
manager_->mutex_.unlock();
manager_->expireCallback_(task->getRunnable());
manager_->mutex_.lock();
manager_->expiredCount_++;
}
}
}
/**
* Final accounting for the worker thread that is done working
*/
manager_->deadWorkers_.insert(this->thread());
if (--manager_->workerCount_ == manager_->workerMaxCount_) {
manager_->workerMonitor_.notify();
}
}
private:
ThreadManager::Impl* manager_;
friend class ThreadManager::Impl;
STATE state_;
};
void ThreadManager::Impl::addWorker(size_t value) {
std::set<shared_ptr<Thread> > newThreads;
for (size_t ix = 0; ix < value; ix++) {
shared_ptr<ThreadManager::Worker> worker
= std::make_shared<ThreadManager::Worker>(this);
newThreads.insert(threadFactory_->newThread(worker));
}
Guard g(mutex_);
workerMaxCount_ += value;
workers_.insert(newThreads.begin(), newThreads.end());
for (const auto & newThread : newThreads) {
shared_ptr<ThreadManager::Worker> worker
= dynamic_pointer_cast<ThreadManager::Worker, Runnable>(newThread->runnable());
worker->state_ = ThreadManager::Worker::STARTING;
newThread->start();
idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >(newThread->getId(), newThread));
}
while (workerCount_ != workerMaxCount_) {
workerMonitor_.wait();
}
}
void ThreadManager::Impl::start() {
Guard g(mutex_);
if (state_ == ThreadManager::STOPPED) {
return;
}
if (state_ == ThreadManager::UNINITIALIZED) {
if (!threadFactory_) {
throw InvalidArgumentException();
}
state_ = ThreadManager::STARTED;
monitor_.notifyAll();
}
while (state_ == STARTING) {
monitor_.wait();
}
}
void ThreadManager::Impl::stop() {
Guard g(mutex_);
bool doStop = false;
if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
&& state_ != ThreadManager::STOPPED) {
doStop = true;
state_ = ThreadManager::JOINING;
}
if (doStop) {
removeWorkersUnderLock(workerCount_);
}
state_ = ThreadManager::STOPPED;
}
void ThreadManager::Impl::removeWorker(size_t value) {
Guard g(mutex_);
removeWorkersUnderLock(value);
}
void ThreadManager::Impl::removeWorkersUnderLock(size_t value) {
if (value > workerMaxCount_) {
throw InvalidArgumentException();
}
workerMaxCount_ -= value;
if (idleCount_ > value) {
// There are more idle workers than we need to remove,
// so notify enough of them so they can terminate.
for (size_t ix = 0; ix < value; ix++) {
monitor_.notify();
}
} else {
// There are as many or less idle workers than we need to remove,
// so just notify them all so they can terminate.
monitor_.notifyAll();
}
while (workerCount_ != workerMaxCount_) {
workerMonitor_.wait();
}
for (const auto & deadWorker : deadWorkers_) {
// when used with a joinable thread factory, we join the threads as we remove them
if (!threadFactory_->isDetached()) {
deadWorker->join();
}
idMap_.erase(deadWorker->getId());
workers_.erase(deadWorker);
}
deadWorkers_.clear();
}
bool ThreadManager::Impl::canSleep() const {
const Thread::id_t id = threadFactory_->getCurrentThreadId();
return idMap_.find(id) == idMap_.end();
}
void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) {
Guard g(mutex_, timeout);
if (!g) {
throw TimedOutException();
}
if (state_ != ThreadManager::STARTED) {
throw IllegalStateException(
"ThreadManager::Impl::add ThreadManager "
"not started");
}
// if we're at a limit, remove an expired task to see if the limit clears
if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
removeExpired(true);
}
if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
if (canSleep() && timeout >= 0) {
while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
// This is thread safe because the mutex is shared between monitors.
maxMonitor_.wait(timeout);
}
} else {
throw TooManyPendingTasksException();
}
}
tasks_.push_back(std::make_shared<ThreadManager::Task>(value, expiration));
// If idle thread is available notify it, otherwise all worker threads are
// running and will get around to this task in time.
if (idleCount_ > 0) {
monitor_.notify();
}
}
void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Guard g(mutex_);
if (state_ != ThreadManager::STARTED) {
throw IllegalStateException(
"ThreadManager::Impl::remove ThreadManager not "
"started");
}
for (auto it = tasks_.begin(); it != tasks_.end(); ++it)
{
if ((*it)->getRunnable() == task)
{
tasks_.erase(it);
return;
}
}
}
std::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
Guard g(mutex_);
if (state_ != ThreadManager::STARTED) {
throw IllegalStateException(
"ThreadManager::Impl::removeNextPending "
"ThreadManager not started");
}
if (tasks_.empty()) {
return std::shared_ptr<Runnable>();
}
shared_ptr<ThreadManager::Task> task = tasks_.front();
tasks_.pop_front();
return task->getRunnable();
}
void ThreadManager::Impl::removeExpired(bool justOne) {
// this is always called under a lock
if (tasks_.empty()) {
return;
}
auto now = std::chrono::steady_clock::now();
for (auto it = tasks_.begin(); it != tasks_.end(); )
{
if ((*it)->getExpireTime() && *((*it)->getExpireTime()) < now) {
if (expireCallback_) {
expireCallback_((*it)->getRunnable());
}
it = tasks_.erase(it);
++expiredCount_;
if (justOne) {
return;
}
}
else
{
++it;
}
}
}
void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
Guard g(mutex_);
expireCallback_ = expireCallback;
}
class SimpleThreadManager : public ThreadManager::Impl {
public:
SimpleThreadManager(size_t workerCount = 4, size_t pendingTaskCountMax = 0)
: workerCount_(workerCount), pendingTaskCountMax_(pendingTaskCountMax) {}
void start() override {
ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
ThreadManager::Impl::start();
addWorker(workerCount_);
}
private:
const size_t workerCount_;
const size_t pendingTaskCountMax_;
};
shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
return shared_ptr<ThreadManager>(new ThreadManager::Impl());
}
shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count,
size_t pendingTaskCountMax) {
return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
}
}
}
} // apache::thrift::concurrency