| // Copyright (c) 2013, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| // |
| // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. See the AUTHORS file for names of contributors. |
| |
| #include "monitoring/histogram_windowing.h" |
| #include "monitoring/histogram.h" |
| #include "util/cast_util.h" |
| |
| #include <algorithm> |
| |
| namespace rocksdb { |
| |
| HistogramWindowingImpl::HistogramWindowingImpl() { |
| env_ = Env::Default(); |
| window_stats_.reset(new HistogramStat[num_windows_]); |
| Clear(); |
| } |
| |
| HistogramWindowingImpl::HistogramWindowingImpl( |
| uint64_t num_windows, |
| uint64_t micros_per_window, |
| uint64_t min_num_per_window) : |
| num_windows_(num_windows), |
| micros_per_window_(micros_per_window), |
| min_num_per_window_(min_num_per_window) { |
| env_ = Env::Default(); |
| window_stats_.reset(new HistogramStat[num_windows_]); |
| Clear(); |
| } |
| |
| HistogramWindowingImpl::~HistogramWindowingImpl() { |
| } |
| |
| void HistogramWindowingImpl::Clear() { |
| std::lock_guard<std::mutex> lock(mutex_); |
| |
| stats_.Clear(); |
| for (size_t i = 0; i < num_windows_; i++) { |
| window_stats_[i].Clear(); |
| } |
| current_window_.store(0, std::memory_order_relaxed); |
| last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed); |
| } |
| |
| bool HistogramWindowingImpl::Empty() const { return stats_.Empty(); } |
| |
| // This function is designed to be lock free, as it's in the critical path |
| // of any operation. |
| // Each individual value is atomic, it is just that some samples can go |
| // in the older bucket which is tolerable. |
| void HistogramWindowingImpl::Add(uint64_t value){ |
| TimerTick(); |
| |
| // Parent (global) member update |
| stats_.Add(value); |
| |
| // Current window update |
| window_stats_[current_window()].Add(value); |
| } |
| |
| void HistogramWindowingImpl::Merge(const Histogram& other) { |
| if (strcmp(Name(), other.Name()) == 0) { |
| Merge( |
| *static_cast_with_check<const HistogramWindowingImpl, const Histogram>( |
| &other)); |
| } |
| } |
| |
| void HistogramWindowingImpl::Merge(const HistogramWindowingImpl& other) { |
| std::lock_guard<std::mutex> lock(mutex_); |
| stats_.Merge(other.stats_); |
| |
| if (stats_.num_buckets_ != other.stats_.num_buckets_ || |
| micros_per_window_ != other.micros_per_window_) { |
| return; |
| } |
| |
| uint64_t cur_window = current_window(); |
| uint64_t other_cur_window = other.current_window(); |
| // going backwards for alignment |
| for (unsigned int i = 0; |
| i < std::min(num_windows_, other.num_windows_); i++) { |
| uint64_t window_index = |
| (cur_window + num_windows_ - i) % num_windows_; |
| uint64_t other_window_index = |
| (other_cur_window + other.num_windows_ - i) % other.num_windows_; |
| |
| window_stats_[window_index].Merge(other.window_stats_[other_window_index]); |
| } |
| } |
| |
| std::string HistogramWindowingImpl::ToString() const { |
| return stats_.ToString(); |
| } |
| |
| double HistogramWindowingImpl::Median() const { |
| return Percentile(50.0); |
| } |
| |
| double HistogramWindowingImpl::Percentile(double p) const { |
| // Retry 3 times in total |
| for (int retry = 0; retry < 3; retry++) { |
| uint64_t start_num = stats_.num(); |
| double result = stats_.Percentile(p); |
| // Detect if swap buckets or Clear() was called during calculation |
| if (stats_.num() >= start_num) { |
| return result; |
| } |
| } |
| return 0.0; |
| } |
| |
| double HistogramWindowingImpl::Average() const { |
| return stats_.Average(); |
| } |
| |
| double HistogramWindowingImpl::StandardDeviation() const { |
| return stats_.StandardDeviation(); |
| } |
| |
| void HistogramWindowingImpl::Data(HistogramData * const data) const { |
| stats_.Data(data); |
| } |
| |
| void HistogramWindowingImpl::TimerTick() { |
| uint64_t curr_time = env_->NowMicros(); |
| if (curr_time - last_swap_time() > micros_per_window_ && |
| window_stats_[current_window()].num() >= min_num_per_window_) { |
| SwapHistoryBucket(); |
| } |
| } |
| |
| void HistogramWindowingImpl::SwapHistoryBucket() { |
| // Threads executing Add() would be competing for this mutex, the first one |
| // who got the metex would take care of the bucket swap, other threads |
| // can skip this. |
| // If mutex is held by Merge() or Clear(), next Add() will take care of the |
| // swap, if needed. |
| if (mutex_.try_lock()) { |
| last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed); |
| |
| uint64_t curr_window = current_window(); |
| uint64_t next_window = (curr_window == num_windows_ - 1) ? |
| 0 : curr_window + 1; |
| |
| // subtract next buckets from totals and swap to next buckets |
| HistogramStat& stats_to_drop = window_stats_[next_window]; |
| |
| if (!stats_to_drop.Empty()) { |
| for (size_t b = 0; b < stats_.num_buckets_; b++){ |
| stats_.buckets_[b].fetch_sub( |
| stats_to_drop.bucket_at(b), std::memory_order_relaxed); |
| } |
| |
| if (stats_.min() == stats_to_drop.min()) { |
| uint64_t new_min = std::numeric_limits<uint64_t>::max(); |
| for (unsigned int i = 0; i < num_windows_; i++) { |
| if (i != next_window) { |
| uint64_t m = window_stats_[i].min(); |
| if (m < new_min) new_min = m; |
| } |
| } |
| stats_.min_.store(new_min, std::memory_order_relaxed); |
| } |
| |
| if (stats_.max() == stats_to_drop.max()) { |
| uint64_t new_max = 0; |
| for (unsigned int i = 0; i < num_windows_; i++) { |
| if (i != next_window) { |
| uint64_t m = window_stats_[i].max(); |
| if (m > new_max) new_max = m; |
| } |
| } |
| stats_.max_.store(new_max, std::memory_order_relaxed); |
| } |
| |
| stats_.num_.fetch_sub(stats_to_drop.num(), std::memory_order_relaxed); |
| stats_.sum_.fetch_sub(stats_to_drop.sum(), std::memory_order_relaxed); |
| stats_.sum_squares_.fetch_sub( |
| stats_to_drop.sum_squares(), std::memory_order_relaxed); |
| |
| stats_to_drop.Clear(); |
| } |
| |
| // advance to next window bucket |
| current_window_.store(next_window, std::memory_order_relaxed); |
| |
| mutex_.unlock(); |
| } |
| } |
| |
| } // namespace rocksdb |