blob: e6cb764a96a6cb3ffc27a654caa4224d47c3a06f [file] [log] [blame]
/*
* Copyright 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Author: jmarantz@google.com (Joshua Marantz)
#include "pagespeed/kernel/thread/queued_worker_pool.h"
#include <deque>
#include <set>
#include <vector>
#include "base/logging.h"
#include "pagespeed/kernel/base/abstract_mutex.h"
#include "pagespeed/kernel/base/condvar.h"
#include "pagespeed/kernel/base/function.h"
#include "pagespeed/kernel/base/thread_annotations.h"
#include "pagespeed/kernel/base/thread_system.h"
#include "pagespeed/kernel/base/timer.h"
#include "pagespeed/kernel/base/waveform.h"
#include "pagespeed/kernel/thread/queued_worker.h"
namespace net_instaweb {
namespace {
inline void UpdateWaveform(Waveform* queue_size, int delta) {
if ((queue_size != NULL) && (delta != 0)) {
queue_size->AddDelta(delta);
}
}
const size_t kUnboundedQueue = 0;
} // namespace
QueuedWorkerPool::QueuedWorkerPool(
int max_workers, StringPiece thread_name_base, ThreadSystem* thread_system)
: thread_system_(thread_system),
mutex_(thread_system_->NewMutex()),
max_workers_(max_workers),
shutdown_(false),
queue_size_(NULL),
load_shedding_threshold_(kNoLoadShedding) {
thread_name_base.CopyToString(&thread_name_base_);
}
QueuedWorkerPool::~QueuedWorkerPool() {
ShutDown();
// Final shutdown (in case ShutDown was not called) and deletion of
// sequences.
for (int i = 0, n = all_sequences_.size(); i < n; ++i) {
Sequence* sequence = all_sequences_[i];
sequence->WaitForShutDown();
delete sequence;
}
}
void QueuedWorkerPool::ShutDown() {
InitiateShutDown();
WaitForShutDownComplete();
}
void QueuedWorkerPool::InitiateShutDown() {
// Set the shutdown flag so that no one adds any more groups.
{
ScopedMutex lock(mutex_.get());
if (shutdown_) {
// ShutDown might be called explicitly and also from the destructor.
// No workers should have magically re-appeared while in shutdown mode,
// although the all_sequences_ vector may be non-empty since we don't
// delete those till the pool itself is deleted.
DCHECK(active_workers_.empty());
DCHECK(available_workers_.empty());
return;
}
shutdown_ = true;
}
// Clear out all the sequences, so that no one adds any more runnable
// functions. We don't need to lock our access to all_sequences_ as
// that can only be mutated when shutdown_ == false.
for (int i = 0, n = all_sequences_.size(); i < n; ++i) {
Sequence* sequence = all_sequences_[i];
sequence->InitiateShutDown();
// Do not delete the sequence; just leave it in shutdown-mode so no
// further tasks will be started in the thread.
}
}
void QueuedWorkerPool::WaitForShutDownComplete() {
DCHECK(shutdown_);
// The sequence shutdown was initiated in ::InitiateShutDown and now
// we must wait for the sequences to all exit before we can delete
// the worker object, otherwise segfaults will occur.
for (int i = 0, n = all_sequences_.size(); i < n; ++i) {
Sequence* sequence = all_sequences_[i];
sequence->WaitForShutDown();
// Do not delete the sequence; just leave it in shutdown-mode so no
// further tasks will be started in the thread.
}
// Wait for all workers to complete whatever they were doing.
//
// TODO(jmarantz): attempt to cancel in-progress functions via
// Function::set_quit_requested. For now, we just complete the
// currently running functions and then shut down.
while (true) {
QueuedWorker* worker = NULL;
{
ScopedMutex lock(mutex_.get());
if (active_workers_.empty()) {
break;
}
std::set<QueuedWorker*>::iterator p = active_workers_.begin();
worker = *p;
active_workers_.erase(p);
}
worker->ShutDown();
delete worker;
}
// At this point there are no active tasks or workers, so we can stop
// mutexing.
for (int i = 0, n = available_workers_.size(); i < n; ++i) {
QueuedWorker* worker = available_workers_[i];
worker->ShutDown();
delete worker;
}
available_workers_.clear();
}
// Runs computable tasks through a worker. Note that a first
// candidate sequence is passed into this method, but we can start
// looking at a new sequence when the passed-in one is exhausted
void QueuedWorkerPool::Run(Sequence* sequence, QueuedWorker* worker) {
while (sequence != NULL) {
// This is a little unfair but we will continue to pull tasks from
// the same sequence and run them until the sequence is exhausted. This
// avoids locking the pool's central mutex every time we want to
// run a new task; we need only mutex at the sequence level.
while (Function* function = sequence->NextFunction()) {
function->CallRun();
}
// Once a sequence is exhausted see if there's another queued sequence,
// If there are no available sequences, the worker gets put back into
// the 'available' list to wait for another Sequence::Add.
sequence = AssignWorkerToNextSequence(worker);
}
}
QueuedWorkerPool::Sequence* QueuedWorkerPool::AssignWorkerToNextSequence(
QueuedWorker* worker) {
Sequence* sequence = NULL;
ScopedMutex lock(mutex_.get());
if (!shutdown_) {
if (queued_sequences_.empty()) {
int erased = active_workers_.erase(worker);
DCHECK_EQ(1, erased);
available_workers_.push_back(worker);
} else {
sequence = queued_sequences_.front();
queued_sequences_.pop_front();
}
}
return sequence;
}
void QueuedWorkerPool::QueueSequence(Sequence* sequence) {
QueuedWorker* worker = NULL;
Sequence* drop_sequence = NULL;
{
ScopedMutex lock(mutex_.get());
if (available_workers_.empty()) {
// If we have haven't yet initiated our full allotment of threads, add
// on demand until we hit that limit.
if (active_workers_.size() < max_workers_) {
worker =
new QueuedWorker(StrCat(thread_name_base_, "-",
IntegerToString(active_workers_.size())),
thread_system_);
worker->Start();
active_workers_.insert(worker);
} else {
// No workers available: must queue the sequence.
queued_sequences_.push_back(sequence);
// If too many sequences are waiting, we will cancel the oldest
// waiting one.
if ((load_shedding_threshold_ != kNoLoadShedding) &&
(queued_sequences_.size() >
static_cast<size_t>(load_shedding_threshold_))) {
drop_sequence = queued_sequences_.front();
queued_sequences_.pop_front();
}
}
} else {
// We pulled a worker off the free-stack.
worker = available_workers_.back();
available_workers_.pop_back();
active_workers_.insert(worker);
}
}
if (drop_sequence != NULL) {
drop_sequence->Cancel();
}
// Run the worker without holding the Pool lock.
if (worker != NULL) {
worker->RunInWorkThread(
new MemberFunction2<QueuedWorkerPool, QueuedWorkerPool::Sequence*,
QueuedWorker*>(
&QueuedWorkerPool::Run, this, sequence, worker));
}
}
bool QueuedWorkerPool::AreBusy(const SequenceSet& sequences)
NO_THREAD_SAFETY_ANALYSIS {
// This is the only operation that accesses multiple workers at once.
// We order our lock acquisitions by address comparisons to get
// 2-phase locking, and thus avoid deadlock... With the ordering
// done for us by SequenceSet already.
for (SequenceSet::iterator i = sequences.begin(); i != sequences.end(); ++i) {
(*i)->sequence_mutex_->Lock();
}
bool busy = false;
for (SequenceSet::iterator i = sequences.begin(); i != sequences.end(); ++i) {
if ((*i)->IsBusy()) {
busy = true;
break;
}
}
for (SequenceSet::iterator i = sequences.begin(); i != sequences.end(); ++i) {
(*i)->sequence_mutex_->Unlock();
}
return busy;
}
void QueuedWorkerPool::SetLoadSheddingThreshold(int x) {
DCHECK((x > 0) || (x == kNoLoadShedding));
load_shedding_threshold_ = x;
}
QueuedWorkerPool::Sequence* QueuedWorkerPool::NewSequence() {
ScopedMutex lock(mutex_.get());
Sequence* sequence = NULL;
if (!shutdown_) {
if (free_sequences_.empty()) {
sequence = new Sequence(thread_system_, this);
sequence->set_queue_size_stat(queue_size_);
all_sequences_.push_back(sequence);
} else {
sequence = free_sequences_.back();
free_sequences_.pop_back();
sequence->Reset();
}
}
return sequence;
}
void QueuedWorkerPool::FreeSequence(Sequence* sequence) {
// If the sequence is inactive, then we can immediately
// recycle it. But if the sequence was busy, then we must
// wait until it completes its last function to recycle it.
// This will happen in QueuedWorkerPool::Sequence::NextFunction,
// which will then call SequenceNoLongerActive.
if (sequence->InitiateShutDown()) {
ScopedMutex lock(mutex_.get());
free_sequences_.push_back(sequence);
}
}
void QueuedWorkerPool::SequenceNoLongerActive(Sequence* sequence) {
ScopedMutex lock(mutex_.get());
if (!shutdown_) {
free_sequences_.push_back(sequence);
}
}
QueuedWorkerPool::Sequence::Sequence(ThreadSystem* thread_system,
QueuedWorkerPool* pool)
: sequence_mutex_(thread_system->NewMutex()),
pool_(pool),
termination_condvar_(sequence_mutex_->NewCondvar()),
queue_size_(NULL),
max_queue_size_(kUnboundedQueue) {
Reset();
}
void QueuedWorkerPool::Sequence::Reset() {
shutdown_ = false;
active_ = false;
DCHECK(work_queue_.empty());
}
QueuedWorkerPool::Sequence::~Sequence() {
DCHECK(shutdown_);
DCHECK(work_queue_.empty());
}
QueuedWorkerPool::Sequence::AddFunction::~AddFunction() {
}
bool QueuedWorkerPool::Sequence::InitiateShutDown() {
ScopedMutex lock(sequence_mutex_.get());
shutdown_ = true;
return !active_;
}
void QueuedWorkerPool::Sequence::WaitForShutDown() {
int num_canceled = 0;
{
ScopedMutex lock(sequence_mutex_.get());
shutdown_ = true;
pool_ = NULL;
while (active_) {
// We use a TimedWait rather than a Wait so that we don't deadlock if
// active_ turns false after the above check and before the call to
// TimedWait.
termination_condvar_->TimedWait(Timer::kSecondMs);
}
num_canceled = CancelTasksOnWorkQueue();
DCHECK(work_queue_.empty());
}
UpdateWaveform(queue_size_, -num_canceled);
}
int QueuedWorkerPool::Sequence::CancelTasksOnWorkQueue() {
int num_canceled = 0;
while (!work_queue_.empty()) {
Function* function = work_queue_.front();
work_queue_.pop_front();
sequence_mutex_->Unlock();
function->CallCancel();
++num_canceled;
sequence_mutex_->Lock();
}
return num_canceled;
}
void QueuedWorkerPool::Sequence::Cancel() {
int num_canceled = 0;
{
ScopedMutex lock(sequence_mutex_.get());
num_canceled = CancelTasksOnWorkQueue();
}
UpdateWaveform(queue_size_, -num_canceled);
}
void QueuedWorkerPool::Sequence::Add(Function* function) {
bool queue_sequence = false;
bool cancel = false;
{
ScopedMutex lock(sequence_mutex_.get());
if (shutdown_) {
LOG(WARNING) << "Adding function to sequence " << this
<< " after shutdown";
cancel = true;
} else {
Function* function_to_add = function;
if ((max_queue_size_ != kUnboundedQueue) &&
(work_queue_.size() >= max_queue_size_)) {
// Overflowing a bounded queue cancels the oldest function. We
// cancel old ones because those are likely to be lookups on behalf
// of older HTML requests that are waiting to be retired. We'd rather
// retire them without optimization than delay them further with a
// slow cache.
function = work_queue_.front();
work_queue_.pop_front();
cancel = true;
}
work_queue_.push_back(function_to_add);
queue_sequence = (!active_ && (work_queue_.size() == 1));
}
}
if (cancel) {
function->CallCancel();
}
if (queue_sequence) {
pool_->QueueSequence(this);
}
UpdateWaveform(queue_size_, cancel ? 0 : 1);
}
void QueuedWorkerPool::Sequence::CancelPendingFunctions() {
std::deque<Function*> cancel_queue;
{
ScopedMutex lock(sequence_mutex_.get());
work_queue_.swap(cancel_queue);
}
UpdateWaveform(queue_size_, -static_cast<int>(cancel_queue.size()));
while (!cancel_queue.empty()) {
Function* f = cancel_queue.front();
cancel_queue.pop_front();
f->CallCancel();
}
}
Function* QueuedWorkerPool::Sequence::NextFunction() {
Function* function = NULL;
QueuedWorkerPool* release_to_pool = NULL;
int queue_size_delta = 0;
{
ScopedMutex lock(sequence_mutex_.get());
if (shutdown_) {
if (active_) {
if (!work_queue_.empty()) {
LOG(WARNING) << "Canceling " << work_queue_.size()
<< " functions on sequence Shutdown";
queue_size_delta -= CancelTasksOnWorkQueue();
}
active_ = false;
// Note after the Signal(), the current sequence may be
// deleted if we are in the process of shutting down the
// entire pool, so no further access to member variables is
// allowed. Hence we copied the pool_ variable to a local
// temp so we can return it. Note also that if the pool is in
// the process of shutting down, then pool_ will be NULL so we
// won't bother to add the free_sequences_ list. In any case
// this will be cleaned on shutdown via all_sequences_.
release_to_pool = pool_;
termination_condvar_->Signal();
}
} else if (work_queue_.empty()) {
active_ = false;
} else {
function = work_queue_.front();
work_queue_.pop_front();
active_ = true;
--queue_size_delta;
}
}
if (release_to_pool != NULL) {
// If the entire pool is in the process of shutting down when
// NextFunction is called, we don't need to add this to the
// free list; the pool will directly delete all sequences from
// QueuedWorkerPool::ShutDown().
release_to_pool->SequenceNoLongerActive(this);
}
UpdateWaveform(queue_size_, queue_size_delta);
return function;
}
bool QueuedWorkerPool::Sequence::IsBusy() {
return active_ || !work_queue_.empty();
}
} // namespace net_instaweb