//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "monitoring/histogram_windowing.h"
#include "monitoring/histogram.h"
#include "util/cast_util.h"

#include <algorithm>

namespace rocksdb {

HistogramWindowingImpl::HistogramWindowingImpl() {
  env_ = Env::Default();
  window_stats_.reset(new HistogramStat[num_windows_]);
  Clear();
}

HistogramWindowingImpl::HistogramWindowingImpl(
    uint64_t num_windows,
    uint64_t micros_per_window,
    uint64_t min_num_per_window) :
      num_windows_(num_windows),
      micros_per_window_(micros_per_window),
      min_num_per_window_(min_num_per_window) {
  env_ = Env::Default();
  window_stats_.reset(new HistogramStat[num_windows_]);
  Clear();
}

HistogramWindowingImpl::~HistogramWindowingImpl() {
}

void HistogramWindowingImpl::Clear() {
  std::lock_guard<std::mutex> lock(mutex_);

  stats_.Clear();
  for (size_t i = 0; i < num_windows_; i++) {
    window_stats_[i].Clear();
  }
  current_window_.store(0, std::memory_order_relaxed);
  last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
}

bool HistogramWindowingImpl::Empty() const { return stats_.Empty(); }

// This function is designed to be lock free, as it's in the critical path
// of any operation.
// Each individual value is atomic, it is just that some samples can go
// in the older bucket which is tolerable.
void HistogramWindowingImpl::Add(uint64_t value){
  TimerTick();

  // Parent (global) member update
  stats_.Add(value);

  // Current window update
  window_stats_[current_window()].Add(value);
}

void HistogramWindowingImpl::Merge(const Histogram& other) {
  if (strcmp(Name(), other.Name()) == 0) {
    Merge(
        *static_cast_with_check<const HistogramWindowingImpl, const Histogram>(
            &other));
  }
}

void HistogramWindowingImpl::Merge(const HistogramWindowingImpl& other) {
  std::lock_guard<std::mutex> lock(mutex_);
  stats_.Merge(other.stats_);

  if (stats_.num_buckets_ != other.stats_.num_buckets_ ||
      micros_per_window_ != other.micros_per_window_) {
    return;
  }

  uint64_t cur_window = current_window();
  uint64_t other_cur_window = other.current_window();
  // going backwards for alignment
  for (unsigned int i = 0;
                    i < std::min(num_windows_, other.num_windows_); i++) {
    uint64_t window_index =
        (cur_window + num_windows_ - i) % num_windows_;
    uint64_t other_window_index =
        (other_cur_window + other.num_windows_ - i) % other.num_windows_;

    window_stats_[window_index].Merge(other.window_stats_[other_window_index]);
  }
}

std::string HistogramWindowingImpl::ToString() const {
  return stats_.ToString();
}

double HistogramWindowingImpl::Median() const {
  return Percentile(50.0);
}

double HistogramWindowingImpl::Percentile(double p) const {
  // Retry 3 times in total
  for (int retry = 0; retry < 3; retry++) {
    uint64_t start_num = stats_.num();
    double result = stats_.Percentile(p);
    // Detect if swap buckets or Clear() was called during calculation
    if (stats_.num() >= start_num) {
      return result;
    }
  }
  return 0.0;
}

double HistogramWindowingImpl::Average() const {
  return stats_.Average();
}

double HistogramWindowingImpl::StandardDeviation() const {
  return stats_.StandardDeviation();
}

void HistogramWindowingImpl::Data(HistogramData * const data) const {
  stats_.Data(data);
}

void HistogramWindowingImpl::TimerTick() {
  uint64_t curr_time = env_->NowMicros();
  if (curr_time - last_swap_time() > micros_per_window_ &&
      window_stats_[current_window()].num() >= min_num_per_window_) {
    SwapHistoryBucket();
  }
}

void HistogramWindowingImpl::SwapHistoryBucket() {
  // Threads executing Add() would be competing for this mutex, the first one
  // who got the metex would take care of the bucket swap, other threads
  // can skip this.
  // If mutex is held by Merge() or Clear(), next Add() will take care of the
  // swap, if needed.
  if (mutex_.try_lock()) {
    last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);

    uint64_t curr_window = current_window();
    uint64_t next_window = (curr_window == num_windows_ - 1) ?
                                                    0 : curr_window + 1;

    // subtract next buckets from totals and swap to next buckets
    HistogramStat& stats_to_drop = window_stats_[next_window];

    if (!stats_to_drop.Empty()) {
      for (size_t b = 0; b < stats_.num_buckets_; b++){
        stats_.buckets_[b].fetch_sub(
            stats_to_drop.bucket_at(b), std::memory_order_relaxed);
      }

      if (stats_.min() == stats_to_drop.min()) {
        uint64_t new_min = std::numeric_limits<uint64_t>::max();
        for (unsigned int i = 0; i < num_windows_; i++) {
          if (i != next_window) {
            uint64_t m = window_stats_[i].min();
            if (m < new_min) new_min = m;
          }
        }
        stats_.min_.store(new_min, std::memory_order_relaxed);
      }

      if (stats_.max() == stats_to_drop.max()) {
        uint64_t new_max = 0;
        for (unsigned int i = 0; i < num_windows_; i++) {
          if (i != next_window) {
            uint64_t m = window_stats_[i].max();
            if (m > new_max) new_max = m;
          }
        }
        stats_.max_.store(new_max, std::memory_order_relaxed);
      }

      stats_.num_.fetch_sub(stats_to_drop.num(), std::memory_order_relaxed);
      stats_.sum_.fetch_sub(stats_to_drop.sum(), std::memory_order_relaxed);
      stats_.sum_squares_.fetch_sub(
                  stats_to_drop.sum_squares(), std::memory_order_relaxed);

      stats_to_drop.Clear();
    }

    // advance to next window bucket
    current_window_.store(next_window, std::memory_order_relaxed);

    mutex_.unlock();
  }
}

}  // namespace rocksdb
