blob: 116e9574a3a93bb440adcdd64e7ac82a9c43ef5d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <boost/bind.hpp>
#include "common/atomic.h"
#include "common/status.h"
#include "util/thread.h"
namespace impala {
// Manages a thread that periodically notifies a condition variable. This thread never
// returns. An indicator variable must be specified to guard against spurious wakeups.
//
// Immediately before this class notfies the condition variable, it sets the indicator
// variable to the `wakeup_value` specified in the constructor. It is the responsibility
// of the thread consuming this class to reset the indicator variable to a value other
// than `wakeup_value` before the consuming thread goes to sleep.
//
// If the periodic code takes longer to run than the specified duration, then the code
// will immediately execute the next time around.
//
// Internally, this class uses std::this_thread:sleep_for which may sleep for longer than
// the specified duration due to scheduling or resource contention delays.
// For details, see https://en.cppreference.com/w/cpp/thread/sleep_for.
//
// Example usage:
//
// #include <chrono>
// #include <condition_variable>
// #include <memory>
// #include <mutex>
//
// #include "common/status.h"
//
// std::condition_variable cv;
// std::mutex mu;
// std::shared_ptr<bool> wakeup_guard = make_shared<bool>();
// Ticker<std::chrono::seconds, bool> ticker(std::chrono::seconds(30), cv, mu,
// wakeup_guard, true);
//
// ABORT_IF_ERROR(ticker.Start());
//
// while(true) {
// unique_lock<mutex> l(mu);
// cv.wait(l, ticker.WakeupGuard());
// *wakeup_guard = false;
//
// run_my_code();
// }
template <typename DurationType, typename IndicatorType>
class Ticker {
public:
Ticker(DurationType interval, std::condition_variable& cv,
std::mutex& lock, std::shared_ptr<IndicatorType> indicator,
IndicatorType wakeup_value) : interval_(interval), cv_(cv), lock_(lock),
indicator_(indicator), wakeup_value_(wakeup_value) {}
Status Start(const std::string& category, const std::string& name) {
return Thread::Create(category, name, &Ticker::run, this, &my_thread_);
}
// Specify that the next iteration of this ticker be the last. This function does not
// block nor does it cause the ticker to wake up earlier than scheduled.
void RequestStop() {
stop_requested_.Store(true);
}
// Wait for the ticker to exit after it's final iteration.
void Join() {
my_thread_->Join();
}
// Provides a default implementation for the condition variable predicate lambda.
std::function<bool()> WakeupGuard() {
return [this]{ return *indicator_ == wakeup_value_; };
}
protected:
const DurationType interval_;
std::condition_variable& cv_;
std::mutex& lock_;
std::shared_ptr<IndicatorType> indicator_;
const IndicatorType wakeup_value_;
private:
std::unique_ptr<Thread> my_thread_;
AtomicBool stop_requested_;
void run() {
while (!stop_requested_.Load()) {
std::this_thread::sleep_for(interval_);
{
std::lock_guard<std::mutex> l(lock_);
*indicator_ = wakeup_value_;
}
cv_.notify_all();
}
}
}; // class Ticker
// Specialization of the Ticker class that uses seconds for the duration and bool as the
// wakeup indicator. The boolean shared_ptr indicator is internally managed. Use the
// ResetWakeupGuard() function in your code immediately after the condition variable wait
// to set the internally managed wakeup guard for the next iteration.
class TickerSecondsBool : public Ticker<std::chrono::seconds, bool> {
public:
TickerSecondsBool(uint32_t interval, std::condition_variable& cv,
std::mutex& lock) :
Ticker(std::chrono::seconds(interval), cv, lock, std::make_shared<bool>(), true) {}
void ResetWakeupGuard() {
*indicator_ = false;
}
}; // class TickerSecondsBool
} // namespace impala