blob: a3a57708fef9320bd62c08261a00454bf22c4b36 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <gtest/gtest_prod.h>
#include "kudu/gutil/macros.h"
#include "kudu/util/locks.h"
#include "kudu/util/make_shared.h"
#include "kudu/util/monotime.h"
#include "kudu/util/random.h"
namespace kudu {
namespace rpc {
class Messenger;
// Repeatedly runs a task on a fixed period.
// PeriodicTimer's periodicity is maintained via tail recursive calls to
// Messenger::ScheduleOnReactor(). Every time the scheduled callback is
// invoked, it checks the current time, updates some internal bookkeeping,
// runs the user's task if the time is right, and makes another call to
// Messenger::ScheduleOnReactor() to run itself again in the future. This
// looping behavior is called a "callback loop".
// Every time Stop() and then Start() (or just Start(), if this is the first
// such call) are invoked, PeriodicTimer will kick off a new callback loop. If
// there was an old loop, it remains intact until its scheduled callback runs,
// at which point it will detect that a new loop was created and exit.
// The use of Messenger::ScheduleOnReactor() is merely for convenience;
// PeriodicTimer could also be built on libev, a hashed wheel timer, o
// something equivalent.
// PeriodicTimers have shared ownership, but that's largely an implementation
// detail to support asynchronous stopping. Users can treat them as exclusively
// owned (though care must be taken when writing the task functor; see Stop()
// for more details).
// TODO(adar): eventually we should build directly on libev as it supports
// timer cancelation, which would allow us to implement synchronous Stop(), use
// exclusive ownership, and remove the restriction that the delta passed
// into Snooze() be greater than GetMinimumPeriod().
class PeriodicTimer : public std::enable_shared_from_this<PeriodicTimer>,
public enable_make_shared<PeriodicTimer> {
typedef std::function<void(void)> RunTaskFunctor;
struct Options {
// Defines the percentage of the period that will be jittered up or down
// randomly. Together with the period, the periodicity of the timer will
// vary between (1-J)*P and (1+J)*P.
// Must be between 0 and 1.
// If not set, defaults to 0.25.
double jitter_pct;
// The timer will automatically stop after running the user's task.
// Just as with a normal timer, Snooze() will postpone the running of the
// task, and Stop() will cancel the task outright. Unlike a normal timer,
// both operations will no-op if the timer has already fired.
// If not set, defaults to false.
bool one_shot;
// Creates a new PeriodicTimer.
// A ref is taken on 'messenger', which is used for scheduling callbacks.
// 'functor' defines the user's task and is owned for the lifetime of the
// PeriodicTimer. The task will run on the messenger's reactor threads so it
// should do very little work (i.e. no I/O).
// 'period' defines the period between tasks.
// 'options' allows additional (optional) customization of the timer.
static std::shared_ptr<PeriodicTimer> Create(
std::shared_ptr<Messenger> messenger,
RunTaskFunctor functor,
MonoDelta period,
Options options = {});
// Starts the timer.
// The timer's task will run in accordance with the period and jitter mode
// provided during timer construction.
// If 'next_task_delta' is set, it is used verbatim as the delay for the very
// first task, with the configured period and jitter mode only applying to
// subsequent tasks.
// Does nothing if the timer was already started.
void Start(std::optional<MonoDelta> next_task_delta = std::nullopt);
// Snoozes the timer for one period.
// If 'next_task_delta' is set, it is used verbatim as the delay for the next
// task. Subsequent tasks will revert to the timer's regular period. The
// value of 'next_task_delta' must be greater than GetMinimumPeriod();
// otherwise the task is not guaranteed to run in a timely manner.
// Note: Snooze() is not additive. That is, if called at time X and again at
// time X + P/2, the timer is snoozed until X+P/2+P, not X+2P.
// Does nothing if the timer is stopped.
void Snooze(std::optional<MonoDelta> next_task_delta = std::nullopt);
// Stops the timer.
// Stopping is asynchronous; that is, it is still possible for the task to
// run after Stop() returns. Because of this, the task's functor should be
// written to do nothing if objects it depends on have been destroyed.
// Does nothing if the timer is already stopped.
void Stop();
// Returns true iff the timer has been started.
bool started() const;
PeriodicTimer(std::shared_ptr<Messenger> messenger,
RunTaskFunctor functor,
MonoDelta period,
Options options);
FRIEND_TEST(PeriodicTimerTest, TestCallbackRestartsTimer);
// Calculate the minimum period for the timer, which varies depending on
// 'jitter_pct_' and the output of the PRNG.
MonoDelta GetMinimumPeriod();
// Called by Messenger::ScheduleOnReactor when the timer fires.
// 'my_callback_generation' is the callback generation assigned to this loop
// when it was constructed.
void Callback(int64_t my_callback_generation);
// Like Stop() but must be called with 'lock_' held.
void StopUnlocked();
// Like Snooze() but must be called with 'lock_' held.
void SnoozeUnlocked(std::optional<MonoDelta> next_task_delta = std::nullopt);
// Returns the number of times that Callback() has been called by this timer.
// Should only be used for tests!
int64_t NumCallbacksForTests() const;
// Schedules invocations of Callback() in the future.
std::shared_ptr<Messenger> messenger_;
// User-defined task functor.
RunTaskFunctor functor_;
// User-specified task period.
const MonoDelta period_;
// User-specified options.
const Options options_;
// Protects all mutable state below.
mutable simple_spinlock lock_;
// PRNG used when generating jitter.
Random rng_;
// The next time at which the task's functor should be run.
MonoTime next_task_time_;
// The most recent callback generation.
// When started, a callback loop is assigned a generation, which it remembers
// for its entire lifespan. If 'current_callback_generation_' exceeds the
// loop's assigned generation, that means another loop has been created and
// the (now old) loop should exit.
int64_t current_callback_generation_;
// The number of times that Callback() has been invoked.
int64_t num_callbacks_for_tests_;
// Whether the timer is running or not.
bool started_;
} // namespace rpc
} // namespace kudu