blob: c31d4154d33388e274bfa092f70b918eacd51f0a [file] [log] [blame]
// Copyright 2011 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author: jmarantz@google.com (Joshua Marantz)
#ifndef PAGESPEED_KERNEL_THREAD_SCHEDULER_H_
#define PAGESPEED_KERNEL_THREAD_SCHEDULER_H_
#include <set>
#include "pagespeed/kernel/base/basictypes.h"
#include "pagespeed/kernel/base/condvar.h"
#include "pagespeed/kernel/base/function.h"
#include "pagespeed/kernel/base/scoped_ptr.h"
#include "pagespeed/kernel/base/thread_annotations.h"
#include "pagespeed/kernel/base/thread_system.h"
#include "pagespeed/kernel/base/timer.h"
#include "pagespeed/kernel/thread/queued_worker_pool.h"
// TODO(jmarantz): The Scheduler should cancel all outstanding operations
// on destruction. Deploying this requires further analysis of shutdown
// ordering.
#define SCHEDULER_CANCEL_OUTSTANDING_ALARMS_ON_DESTRUCTION 0
namespace net_instaweb {
// Implements a simple scheduler that allows a thread to block until either time
// expires, or a condition variable is signaled. Also permits various alarms to
// be scheduled; these are lightweight short-lived callbacks that must be safely
// runnable from any thread in any lock state in which scheduler invocations
// occur. Finally, implements a hybrid between these: a callback that can be
// run when the condition variable is signaled.
//
// This class is designed to be overridden, but only to re-implement its
// internal notion of blocking to permit time to be mocked by MockScheduler.
class Scheduler {
public:
// A callback for a scheduler alarm, with an associated wakeup time (absolute
// time after which the callback will be invoked with Run() by the scheduler).
// Alarm should be treated as an opaque type.
class Alarm;
// Sorting comparator for Alarms, so that they can be retrieved in time
// order. For use by std::set, thus public.
struct CompareAlarms {
bool operator()(const Alarm* a, const Alarm* b) const;
};
Scheduler(ThreadSystem* thread_system, Timer* timer);
virtual ~Scheduler();
ThreadSystem::CondvarCapableMutex* mutex() LOCK_RETURNED(mutex_) {
return mutex_.get();
}
// Optionally check that mutex is locked for debugging purposes.
void DCheckLocked() EXCLUSIVE_LOCKS_REQUIRED(mutex()) {
mutex_->DCheckLocked();
}
// Condition-style methods: The following three methods provide a simple
// condition-variable-style interface that can be used to coordinate the
// threads sharing the scheduler.
// Wait at most timeout_ms, or until Signal() is called.
void BlockingTimedWaitMs(int64 timeout_ms) EXCLUSIVE_LOCKS_REQUIRED(mutex()) {
BlockingTimedWaitUs(timeout_ms * Timer::kMsUs);
}
void BlockingTimedWaitUs(int64 timeout_us) EXCLUSIVE_LOCKS_REQUIRED(mutex());
// Non-blocking invocation of callback either when Signal() is called, or
// after timeout_ms have passed. Ownership of callback passes to the
// scheduler, which deallocates it after invocation. mutex() must be held on
// the initial call, and is locked for the duration of callback. Note that
// callback may be invoked in a different thread from the calling thread.
void TimedWaitMs(int64 timeout_ms, Function* callback)
EXCLUSIVE_LOCKS_REQUIRED(mutex());
// Signal threads in BlockingTimedWait[Ms,Us] and invoke TimedWaitMs
// callbacks. Performs outstanding work, including any triggered by the
// signal, before returning; note that this means it may drop the scheduler
// lock internally while doing callback invocation, which is different from
// the usual condition variable signal semantics.
void Signal() EXCLUSIVE_LOCKS_REQUIRED(mutex());
// Alarms. The following two methods provide a mechanism for scheduling
// alarm tasks, each run at a particular time.
// Schedules an alarm for absolute time wakeup_time_us, using the passed-in
// Function* as the alarm callback. Returns the created Alarm. Performs
// outstanding work. The returned alarm will own the callback and will clean
// itself and the callback when it is run or cancelled. NOTE in particular
// that calls to CancelAlarm must ensure the callback has not been invoked
// yet. This is why the scheduler mutex must be held for CancelAlarm.
//
// Will wakeup the scheduler if the time of the first alarm changed.
// It will also run any outstanding alarms. Both of these
// operations will result in temporarily dropping the lock. See also
// AddAlarmMutexHelp.
//
// TODO(jmaessen): Get rid of AddAlarmAtUs, or rename to
// AddAlarmAtUsAndRunOutstanding or similar.
Alarm* AddAlarmAtUs(int64 wakeup_time_us, Function* callback)
LOCKS_EXCLUDED(mutex());
// Adds a new alarm. Does not run any alarms, broadcast, or drop locks.
// See doc for AddAlarmAtUs.
Alarm* AddAlarmAtUsMutexHeld(int64 wakeup_time_us, Function* callback)
EXCLUSIVE_LOCKS_REQUIRED(mutex());
// Cancels an alarm, calling the Cancel() method and deleting the alarm
// object. Scheduler mutex must be held before call to ensure that alarm is
// not called back before cancellation occurs. Doesn't perform outstanding
// work. Returns true if the cancellation occurred. If false is returned,
// the alarm is already being run / has been run in another thread; if the
// alarm deletes itself on Cancel(), it may no longer safely be used.
//
// Note that once the user callback for the alarm returns it's no longer
// safe to call this (but this method is safe to call when the scheduler has
// committed to running the callback, it will just return false), so it's the
// caller's responsibility to properly synchronize between its callback and
// its invocation of this.
bool CancelAlarm(Alarm* alarm) EXCLUSIVE_LOCKS_REQUIRED(mutex());
// Finally, ProcessAlarmsOrWaitUs provides a mechanism to ensure that pending
// alarms are executed in the absence of other scheduler activity.
// ProcessAlarmsOrWaitUs: handle outstanding alarms, or if there are none wait
// until the next wakeup and handle alarms then before relinquishing control.
// Idle no longer than timeout_us. Passing in timeout_us=0 will run without
// blocking.
//
// Returns true if the scheduler has pending activities remaining, either
// runnable now or in the future.
bool ProcessAlarmsOrWaitUs(int64 timeout_us)
EXCLUSIVE_LOCKS_REQUIRED(mutex());
// Obtain the timer that the scheduler is using internally. Important if you
// and the scheduler want to agree on the passage of time.
Timer* timer() { return timer_; }
// Obtain the thread system used by the scheduler.
ThreadSystem* thread_system() { return thread_system_; }
// Internal method to kick the system because something of interest to the
// overridden AwaitWakeup method has happened. Exported here because C++
// naming hates you.
void Wakeup() { condvar_->Broadcast(); }
// These methods notify the scheduler of work sequences that may run work
// on it. They are only used for time simulations in MockScheduler and
// are no-ops during normal usage.
virtual void RegisterWorker(QueuedWorkerPool::Sequence* w);
virtual void UnregisterWorker(QueuedWorkerPool::Sequence* w);
// Run any alarms that have reached their deadline. Returns the time of the
// next deadline, or 0 if no further deadlines loom. Sets *ran_alarms if
// non-NULL and any alarms were run, otherwise leaves it untouched.
int64 RunAlarms(bool* ran_alarms) EXCLUSIVE_LOCKS_REQUIRED(mutex());
protected:
// Internal method to await a wakeup event. Block until wakeup_time_us (an
// absolute time since the epoch), or until something interesting (such as a
// call to Signal) occurs. This is virtual to permit us to mock it out (the
// mock simply advances time). This maybe called with 0 in case where there
// are no timers currently active.
virtual void AwaitWakeupUntilUs(int64 wakeup_time_us);
bool running_waiting_alarms() const { return running_waiting_alarms_; }
private:
class CondVarTimeout;
class CondVarCallbackTimeout;
friend class SchedulerTest;
typedef std::set<Alarm*, CompareAlarms> AlarmSet;
// Inserts an alarm, optionally broadcasting if the wakeup time has
// changed.
void InsertAlarmAtUsMutexHeld(int64 wakeup_time_us,
bool broadcast_on_wakeup_change,
Alarm* alarm);
void CancelWaiting(Alarm* alarm);
bool NoPendingAlarms();
ThreadSystem* thread_system_;
Timer* timer_;
scoped_ptr<ThreadSystem::CondvarCapableMutex> mutex_;
// condvar_ tracks whether interesting (next-wakeup decreasing or
// signal_count_ increasing) events occur.
scoped_ptr<ThreadSystem::Condvar> condvar_;
uint32 index_; // Used to disambiguate alarms with equal deadlines
AlarmSet outstanding_alarms_; // Priority queue of future alarms
// An alarm may be deleted iff it is successfully removed from
// outstanding_alarms_.
int64 signal_count_; // Number of times Signal has been called
AlarmSet waiting_alarms_; // Alarms waiting for signal_count to change
bool running_waiting_alarms_; // True if we're in process of invoking
// user callbacks...
DISALLOW_COPY_AND_ASSIGN(Scheduler);
};
// A simple adapter class that permits blocking until an alarm has been run or
// cancelled. Designed for stack allocation.
//
// Note that success_ is guarded by the acquire/release semantics of
// atomic_bool and by monotonicity of done_. Field order (==initialization
// order) is important here.
class SchedulerBlockingFunction : public Function {
public:
explicit SchedulerBlockingFunction(Scheduler* scheduler);
virtual ~SchedulerBlockingFunction();
virtual void Run();
virtual void Cancel();
// Block until called back, returning true for Run and false for Cancel.
bool Block();
private:
Scheduler* scheduler_;
bool success_;
bool done_; // protected by scheduler_->mutex()
DISALLOW_COPY_AND_ASSIGN(SchedulerBlockingFunction);
};
} // namespace net_instaweb
#endif // PAGESPEED_KERNEL_THREAD_SCHEDULER_H_