| // Copyright 2011 Google Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| // Authors: jmarantz@google.com (Joshua Marantz) |
| // jmaessen@google.com (Jan Maessen) |
| |
| #include "pagespeed/kernel/thread/scheduler.h" |
| |
| #include <algorithm> |
| #include <set> |
| |
| #include "base/logging.h" |
| #include "pagespeed/kernel/base/abstract_mutex.h" |
| #include "pagespeed/kernel/base/basictypes.h" |
| #include "pagespeed/kernel/base/condvar.h" |
| #include "pagespeed/kernel/base/function.h" |
| #include "pagespeed/kernel/base/scoped_ptr.h" |
| #include "pagespeed/kernel/base/thread_annotations.h" |
| #include "pagespeed/kernel/base/timer.h" |
| |
| namespace net_instaweb { |
| |
| namespace { |
| |
| const int kIndexNotSet = 0; |
| |
| } // namespace |
| |
| // Basic Alarm type (forward declared in the .h file). Note that Alarms are |
| // self-cleaning; it is not valid to make use of an Alarm* after RunAlarm() or |
| // CancelAlarm() has been called. See note below for AddAlarmAtUs. Note also |
| // that Alarms hold the scheduler lock when they are invoked; the alarm drops |
| // the lock before invoking its embedded callback and re-takes it afterwards if |
| // that is necessary. |
| class Scheduler::Alarm { |
| public: |
| virtual void RunAlarm() = 0; |
| virtual void CancelAlarm() = 0; |
| |
| // Compare two alarms, based on wakeup time and insertion order. Result |
| // like strcmp (<0 for this < that, >0 for this > that), based on wakeup |
| // time and index. |
| int Compare(const Alarm* other) const { |
| int cmp = 0; |
| if (this != other) { |
| if (wakeup_time_us_ < other->wakeup_time_us_) { |
| cmp = -1; |
| } else if (wakeup_time_us_ > other->wakeup_time_us_) { |
| cmp = 1; |
| } else if (index_ < other->index_) { |
| cmp = -1; |
| } else { |
| DCHECK(index_ > other->index_); |
| cmp = 1; |
| } |
| } |
| return cmp; |
| } |
| |
| bool in_wait_dispatch() const { return in_wait_dispatch_; } |
| void set_in_wait_dispatch(bool w) { in_wait_dispatch_ = w; } |
| |
| protected: |
| Alarm() : wakeup_time_us_(0), |
| index_(kIndexNotSet), |
| in_wait_dispatch_(false) { } |
| virtual ~Alarm() { } |
| |
| private: |
| friend class Scheduler; |
| int64 wakeup_time_us_; |
| uint32 index_; // Set by scheduler to disambiguate equal wakeup times. |
| |
| // This is used to mark a wait alarm that's being considered by ::Signal |
| // as owned by it for purposes of cleanup, so any concurrent timeout will |
| // know not to delete it. |
| bool in_wait_dispatch_; |
| DISALLOW_COPY_AND_ASSIGN(Alarm); |
| }; |
| |
| namespace { |
| |
| // private class to encapsulate a function being |
| // scheduled as an alarm. Owns passed-in function. |
| class FunctionAlarm : public Scheduler::Alarm { |
| public: |
| explicit FunctionAlarm(Function* function, Scheduler* scheduler) |
| : scheduler_(scheduler), function_(function) { } |
| virtual ~FunctionAlarm() { } |
| |
| virtual void RunAlarm() { |
| DropMutexActAndCleanup(&Function::CallRun); |
| } |
| virtual void CancelAlarm() { |
| DropMutexActAndCleanup(&Function::CallCancel); |
| } |
| |
| private: |
| typedef void (Function::*FunctionAction)(); |
| void DropMutexActAndCleanup(FunctionAction act) NO_THREAD_SAFETY_ANALYSIS { |
| AbstractMutex* mutex = scheduler_->mutex(); // Save across delete. |
| mutex->Unlock(); |
| ((function_)->*(act))(); |
| delete this; |
| mutex->Lock(); |
| } |
| Scheduler* scheduler_; |
| Function* function_; |
| DISALLOW_COPY_AND_ASSIGN(FunctionAlarm); |
| }; |
| |
| } // namespace |
| |
| // The following three classes are effectively supposed to be private, and |
| // should only be used internally to the scheduler, but are semi-exposed due to |
| // C++ naming restrictions. The first two implement condvar waiting. When we |
| // wait using BlockingTimedWait or TimedWait, we put a single alarm into two |
| // queues: the outstanding_alarms_ queue, where it will be run if the wait times |
| // out, and the waiting_alarms_ queue, where it will be canceled if a signal |
| // arrives. The system assumes the waiting_alarms_ queue is a subset of the |
| // outstanding_alarms_ queue, because it holds *only* alarms from ...TimedWait |
| // operations, so on signal the contents of waiting_alarms are cancelled thus |
| // removing them from waiting_alarms and invoking the Cancel() method. However, |
| // on timeout the Run() method must remove the alarm from the waiting_alarms_ |
| // queue so it can be cleaned up safely; doing so means invoking callbacks and |
| // requires us to drop the scheduler lock. This leads to a harmless violation |
| // of the subset condition; see the comment on CancelWaiting which describes the |
| // handling of this condition. |
| |
| // Blocking condvar alarm. Simply sets a flag for the blocking thread to |
| // notice. |
| class Scheduler::CondVarTimeout : public Scheduler::Alarm { |
| public: |
| CondVarTimeout(bool* set_on_timeout, Scheduler* scheduler) |
| : set_on_timeout_(set_on_timeout), |
| scheduler_(scheduler) { } |
| virtual ~CondVarTimeout() { } |
| virtual void RunAlarm() { |
| *set_on_timeout_ = true; |
| scheduler_->CancelWaiting(this); |
| if (!in_wait_dispatch()) { |
| delete this; |
| } |
| } |
| virtual void CancelAlarm() { |
| DCHECK(in_wait_dispatch()); |
| delete this; |
| } |
| private: |
| bool* set_on_timeout_; |
| Scheduler* scheduler_; |
| DISALLOW_COPY_AND_ASSIGN(CondVarTimeout); |
| }; |
| |
| // Non-blocking condvar alarm. Must run the passed-in callback on either |
| // timeout (Run()) or signal (Cancel()). |
| class Scheduler::CondVarCallbackTimeout : public Scheduler::Alarm { |
| public: |
| CondVarCallbackTimeout(Function* callback, Scheduler* scheduler) |
| : callback_(callback), |
| scheduler_(scheduler) { } |
| virtual ~CondVarCallbackTimeout() { } |
| virtual void RunAlarm() { |
| // We may get deleted at tail end of Signal if the lock gets dropped during |
| // CallRun(), so save this into a local. |
| bool saved_in_wait_dispatch = in_wait_dispatch(); |
| scheduler_->CancelWaiting(this); |
| callback_->CallRun(); |
| if (!saved_in_wait_dispatch) { |
| delete this; |
| } |
| } |
| virtual void CancelAlarm() { |
| DCHECK(in_wait_dispatch()); |
| callback_->CallRun(); |
| delete this; |
| } |
| |
| private: |
| Function* callback_; |
| Scheduler* scheduler_; |
| DISALLOW_COPY_AND_ASSIGN(CondVarCallbackTimeout); |
| }; |
| |
| // Comparison on Alarms. |
| bool Scheduler::CompareAlarms::operator()(const Alarm* a, |
| const Alarm* b) const { |
| return a->Compare(b) < 0; |
| } |
| |
| Scheduler::Scheduler(ThreadSystem* thread_system, Timer* timer) |
| : thread_system_(thread_system), |
| timer_(timer), |
| mutex_(thread_system->NewMutex()), |
| condvar_(mutex_->NewCondvar()), |
| index_(kIndexNotSet), |
| signal_count_(0), |
| running_waiting_alarms_(false) { |
| } |
| |
| Scheduler::~Scheduler() { |
| #if SCHEDULER_CANCEL_OUTSTANDING_ALARMS_ON_DESTRUCTION |
| ScopedMutex lock(mutex_.get()); |
| while (!outstanding_alarms_.empty()) { |
| AlarmSet::iterator p = outstanding_alarms_.begin(); |
| Alarm* alarm = *p; |
| outstanding_alarms_.erase(p); |
| alarm->CancelAlarm(); |
| } |
| #endif |
| } |
| |
| void Scheduler::BlockingTimedWaitUs(int64 timeout_us) { |
| mutex_->DCheckLocked(); |
| int64 now_us = timer_->NowUs(); |
| int64 wakeup_time_us = now_us + timeout_us; |
| // We block until signal_count_ changes or we time out. |
| int64 original_signal_count = signal_count_; |
| bool timed_out = false; |
| // Schedule a timeout alarm. |
| CondVarTimeout* alarm = new CondVarTimeout(&timed_out, this); |
| InsertAlarmAtUsMutexHeld(wakeup_time_us, true, alarm); |
| waiting_alarms_.insert(alarm); |
| int64 next_wakeup_us = RunAlarms(NULL); |
| while (signal_count_ == original_signal_count && !timed_out && |
| next_wakeup_us > 0) { |
| // Now we have to block until either we time out, or we are signaled. We |
| // stop when outstanding_alarms_ is empty (and thus RunAlarms(NULL) == 0) as |
| // a belt and suspenders protection against programmer error; this ought to |
| // imply timed_out. |
| AwaitWakeupUntilUs(std::min(wakeup_time_us, next_wakeup_us)); |
| next_wakeup_us = RunAlarms(NULL); |
| } |
| } |
| |
| void Scheduler::TimedWaitMs(int64 timeout_ms, Function* callback) { |
| mutex_->DCheckLocked(); |
| int64 now_us = timer_->NowUs(); |
| int64 completion_time_us = now_us + timeout_ms * Timer::kMsUs; |
| // We create the alarm for this callback, and register it. We also register |
| // the alarm with the signal queue, where the callback will be run on |
| // cancellation. |
| CondVarCallbackTimeout* alarm = new CondVarCallbackTimeout(callback, this); |
| InsertAlarmAtUsMutexHeld(completion_time_us, true, alarm); |
| waiting_alarms_.insert(alarm); |
| RunAlarms(NULL); |
| } |
| |
| void Scheduler::CancelWaiting(Alarm* alarm) { |
| // Called to clean up a [Blocking]TimedWait that timed out. There used to be |
| // a benign race here that meant alarm had been erased from waiting_alarms_ by |
| // a pending Signal operation. Tighter locking on Alarm objects should have |
| // eliminated this hole, but we continue to use presence / absence in |
| // outstanding_alarms_ to resolve signal/cancel races. |
| mutex_->DCheckLocked(); |
| waiting_alarms_.erase(alarm); |
| } |
| |
| void Scheduler::Signal() { |
| mutex_->DCheckLocked(); |
| ++signal_count_; |
| // We have to be careful to not just walk over waiting_alarms_ here |
| // as new entries can be added to it by TimedWait invocations from the |
| // callbacks we run. |
| AlarmSet waiting_alarms_to_dispatch; |
| waiting_alarms_to_dispatch.swap(waiting_alarms_); |
| running_waiting_alarms_ = true; |
| if (!waiting_alarms_to_dispatch.empty()) { |
| // First, mark them all as owned by us, so any concurrent timeouts |
| // that happen while we're releasing the lock to run user code |
| // do not delete them from under us. |
| for (AlarmSet::iterator i = waiting_alarms_to_dispatch.begin(); |
| i != waiting_alarms_to_dispatch.end(); ++i) { |
| (*i)->set_in_wait_dispatch(true); |
| } |
| |
| // Now actually signal those that didn't timeout yet. |
| for (AlarmSet::iterator i = waiting_alarms_to_dispatch.begin(); |
| i != waiting_alarms_to_dispatch.end(); ++i) { |
| if (!CancelAlarm(*i)) { |
| // If CancelAlarm returned false, this means the alarm actually |
| // got run by a timeout. In that case, since we set in_wait_dispatch |
| // to true, it deferred the deletion to us, so take care of it. |
| delete *i; |
| } |
| } |
| } |
| condvar_->Broadcast(); |
| running_waiting_alarms_ = false; |
| RunAlarms(NULL); |
| } |
| |
| // Add alarm while holding mutex. Don't run any alarms or otherwise drop mutex. |
| void Scheduler::InsertAlarmAtUsMutexHeld(int64 wakeup_time_us, |
| bool broadcast_on_wakeup_change, |
| Alarm* alarm) { |
| mutex_->DCheckLocked(); |
| alarm->wakeup_time_us_ = wakeup_time_us; |
| alarm->index_ = ++index_; |
| |
| if (broadcast_on_wakeup_change) { |
| bool wakeup_time_changed = outstanding_alarms_.empty() || |
| (wakeup_time_us < (*outstanding_alarms_.begin())->wakeup_time_us_); |
| if (wakeup_time_changed) { |
| condvar_->Broadcast(); |
| } |
| } |
| |
| outstanding_alarms_.insert(alarm); |
| } |
| |
| Scheduler::Alarm* Scheduler::AddAlarmAtUs(int64 wakeup_time_us, |
| Function* callback) { |
| Alarm* result = new FunctionAlarm(callback, this); |
| ScopedMutex lock(mutex_.get()); |
| InsertAlarmAtUsMutexHeld(wakeup_time_us, true, result); |
| RunAlarms(NULL); |
| return result; |
| } |
| |
| Scheduler::Alarm* Scheduler::AddAlarmAtUsMutexHeld(int64 wakeup_time_us, |
| Function* callback) { |
| Alarm* result = new FunctionAlarm(callback, this); |
| InsertAlarmAtUsMutexHeld(wakeup_time_us, false, result); |
| return result; |
| } |
| |
| bool Scheduler::CancelAlarm(Alarm* alarm) { |
| mutex_->DCheckLocked(); |
| if (outstanding_alarms_.erase(alarm) != 0) { |
| // Note: the following call may drop and re-lock the scheduler mutex. |
| alarm->CancelAlarm(); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| int64 Scheduler::RunAlarms(bool* ran_alarms) { |
| while (!outstanding_alarms_.empty()) { |
| mutex_->DCheckLocked(); |
| // We don't use the iterator to go through the set, because we're dropping |
| // the lock in mid-loop thus permitting new insertions and cancellations. |
| AlarmSet::iterator first_alarm_iterator = outstanding_alarms_.begin(); |
| Alarm* first_alarm = *first_alarm_iterator; |
| int64 now_us = timer_->NowUs(); |
| if (now_us < first_alarm->wakeup_time_us_) { |
| // The next deadline lies in the future. |
| return first_alarm->wakeup_time_us_; |
| } |
| // first_alarm should be run. It can't have been cancelled as we've held |
| // the lock since we found it. |
| outstanding_alarms_.erase(first_alarm_iterator); // Prevent cancellation. |
| if (ran_alarms != NULL) { |
| *ran_alarms = true; |
| } |
| // Note that the following call may drop and re-lock the scheduler lock. |
| first_alarm->RunAlarm(); |
| } |
| return 0; |
| } |
| |
| void Scheduler::AwaitWakeupUntilUs(int64 wakeup_time_us) { |
| mutex_->DCheckLocked(); |
| int64 now_us = timer_->NowUs(); |
| if (wakeup_time_us > now_us) { |
| // Compute how long we should wait. Note: we overshoot, which may lead us |
| // to wake a bit later than expected. We assume the system is likely to |
| // round wakeup time off for us in some arbitrary fashion in any case. |
| int64 wakeup_interval_ms = |
| (wakeup_time_us - now_us + Timer::kMsUs - 1) / Timer::kMsUs; |
| condvar_->TimedWait(wakeup_interval_ms); |
| } |
| } |
| |
| bool Scheduler::ProcessAlarmsOrWaitUs(int64 timeout_us) { |
| mutex_->DCheckLocked(); |
| bool ran_alarms = false; |
| int64 finish_us = timer_->NowUs() + timeout_us; |
| int64 next_wakeup_us = RunAlarms(&ran_alarms); |
| |
| if (timeout_us > 0 && !ran_alarms) { |
| // Note: next_wakeup_us may be 0 here. |
| if (next_wakeup_us == 0 || next_wakeup_us > finish_us) { |
| next_wakeup_us = finish_us; |
| } |
| AwaitWakeupUntilUs(next_wakeup_us); |
| |
| next_wakeup_us = RunAlarms(NULL); |
| } |
| return !outstanding_alarms_.empty(); |
| } |
| |
| // For testing purposes, let a tester know when the scheduler has quiesced. |
| bool Scheduler::NoPendingAlarms() { |
| mutex_->DCheckLocked(); |
| return (outstanding_alarms_.empty()); |
| } |
| |
| SchedulerBlockingFunction::SchedulerBlockingFunction(Scheduler* scheduler) |
| : scheduler_(scheduler), success_(false), done_(false) { |
| set_delete_after_callback(false); |
| } |
| |
| SchedulerBlockingFunction::~SchedulerBlockingFunction() { } |
| |
| void SchedulerBlockingFunction::Run() { |
| success_ = true; |
| Cancel(); |
| } |
| |
| void SchedulerBlockingFunction::Cancel() { |
| ScopedMutex lock(scheduler_->mutex()); |
| done_ = true; |
| scheduler_->Wakeup(); |
| // When this block exits, *this should be considered dead, since the |
| // allocating thread runs Block() and would be able to complete its loop and |
| // return control to the caller who will pop *this from the stack. |
| } |
| |
| bool SchedulerBlockingFunction::Block() { |
| ScopedMutex lock(scheduler_->mutex()); |
| while (!done_) { |
| scheduler_->ProcessAlarmsOrWaitUs(10 * Timer::kSecondUs); |
| } |
| return success_; |
| } |
| |
| void Scheduler::RegisterWorker(QueuedWorkerPool::Sequence* w) {} |
| void Scheduler::UnregisterWorker(QueuedWorkerPool::Sequence* w) {} |
| |
| } // namespace net_instaweb |