blob: abff7362c49285898ef37ce20faad0e2778871d0 [file] [log] [blame]
/*
* Copyright 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Author: morlovich@google.com (Maksim Orlovich)
//
// Implements Worker, base class for various run-in-a-thread classes,
// via Worker::WorkThread.
#include "pagespeed/kernel/thread/worker.h"
#include <deque>
#include "base/logging.h"
#include "pagespeed/kernel/base/abstract_mutex.h"
#include "pagespeed/kernel/base/atomic_bool.h"
#include "pagespeed/kernel/base/basictypes.h"
#include "pagespeed/kernel/base/condvar.h"
#include "pagespeed/kernel/base/function.h"
#include "pagespeed/kernel/base/scoped_ptr.h"
#include "pagespeed/kernel/base/thread.h"
#include "pagespeed/kernel/base/thread_system.h"
#include "pagespeed/kernel/base/waveform.h"
namespace net_instaweb {
// The actual thread that does the work.
class Worker::WorkThread : public ThreadSystem::Thread {
public:
WorkThread(Worker* owner, StringPiece thread_name, ThreadSystem* runtime)
: Thread(runtime, thread_name, ThreadSystem::kJoinable),
owner_(owner),
mutex_(runtime->NewMutex()),
state_change_(mutex_->NewCondvar()),
current_task_(NULL),
exit_(false),
started_(false) {
quit_requested_.set_value(false);
}
// If worker thread exit is requested, returns NULL. Returns next
// pending task, also setting it in current_task_ otherwise. Takes
// care of synchronization, including waiting for next state change.
Function* GetNextTask() {
ScopedMutex lock(mutex_.get());
// Clean any task we were running last iteration
current_task_ = NULL;
while (!exit_ && tasks_.empty()) {
state_change_->Wait();
}
// Handle exit.
if (exit_) {
return NULL;
}
// Get task.
current_task_ = tasks_.front();
tasks_.pop_front();
owner_->UpdateQueueSizeStat(-1);
return current_task_;
}
virtual void Run() {
Function* task;
while ((task = GetNextTask()) != NULL) {
// Run tasks (not holding the lock, so new tasks can be added).
task->set_quit_requested_pointer(&quit_requested_);
task->CallRun();
}
}
void ShutDown() {
{
ScopedMutex lock(mutex_.get());
if (exit_ || !started_) {
// Already shutdown, or was never started in the first place.
return;
}
exit_ = true;
if (current_task_ != NULL) {
quit_requested_.set_value(true);
}
state_change_->Signal();
}
Join();
int delta = tasks_.size();
owner_->UpdateQueueSizeStat(-delta);
while (!tasks_.empty()) {
Function* closure = tasks_.front();
tasks_.pop_front();
closure->CallCancel();
}
started_ = false; // Reject further jobs on explicit shutdown.
}
void Start() {
ScopedMutex lock(mutex_.get());
if (!started_ && !exit_) {
started_ = Thread::Start();
if (!started_) {
LOG(ERROR) << "Unable to start worker thread";
}
}
}
bool QueueIfPermitted(Function* closure) {
if (!started_) {
closure->CallCancel();
return true;
}
ScopedMutex lock(mutex_.get());
if (owner_->IsPermitted(closure)) {
tasks_.push_back(closure);
owner_->UpdateQueueSizeStat(1);
if (current_task_ == NULL) { // wake the thread up if it's idle.
state_change_->Signal();
}
return true;
} else {
return false;
}
}
// Must be called with mutex held.
int NumJobs() {
int num = static_cast<int>(tasks_.size());
if (current_task_ != NULL) {
++num;
}
return num;
}
bool IsBusy() const {
ScopedMutex lock(mutex_.get());
return (current_task_ != NULL) || !tasks_.empty();
}
private:
Worker* owner_;
// guards state_change_, current_task_, tasks_, exit_;
scoped_ptr<ThreadSystem::CondvarCapableMutex> mutex_;
scoped_ptr<ThreadSystem::Condvar> state_change_;
Function* current_task_; // non-NULL if we are actually running something.
std::deque<Function*> tasks_; // things waiting to be run.
bool exit_;
bool started_;
AtomicBool quit_requested_;
DISALLOW_COPY_AND_ASSIGN(WorkThread);
};
Worker::Worker(StringPiece thread_name, ThreadSystem* runtime)
: thread_(new WorkThread(this, thread_name, runtime)),
queue_size_(NULL) {
}
Worker::~Worker() {
thread_->ShutDown();
}
void Worker::Start() {
thread_->Start();
}
bool Worker::IsBusy() {
return thread_->IsBusy();
}
bool Worker::QueueIfPermitted(Function* closure) {
return thread_->QueueIfPermitted(closure);
}
int Worker::NumJobs() {
return thread_->NumJobs();
}
void Worker::ShutDown() {
thread_->ShutDown();
}
void Worker::UpdateQueueSizeStat(int value) {
if (queue_size_ != NULL) {
queue_size_->AddDelta(value);
}
}
} // namespace net_instaweb