// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <map>
#include <sstream>
#include <string>
#include <vector>
#include <boost/function.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/locks.hpp>
#include <gtest/gtest_prod.h> // for FRIEND_TEST
#include <rapidjson/fwd.h>

#include "common/atomic.h"
#include "common/logging.h"
#include "common/object-pool.h"
#include "common/status.h"
#include "kudu/util/web_callback_registry.h"
#include "util/debug-util.h"
#include "util/metrics-fwd.h"
#include "util/spinlock.h"
#include "util/webserver.h"

using kudu::HttpStatusCode;

namespace impala {

/// Singleton that provides metric definitions. Metrics are defined in metrics.json
/// and generate_metrics.py produces MetricDefs.thrift. This singleton wraps an instance
/// of the thrift definitions.
class MetricDefs {
 public:
  /// Gets the TMetricDef for the metric key. 'arg' is an optional argument to the
  /// TMetricDef for metrics defined by a format string. The key must exist or a DCHECK
  /// will fail.
  /// TODO: Support multiple arguments.
  static TMetricDef Get(const std::string& key, const std::string& arg = "");

 private:
  friend class MetricsTest;

  /// Gets the MetricDefs singleton.
  static MetricDefs* GetInstance();

  /// Contains the map of all TMetricDefs, non-const for testing
  MetricDefsConstants metric_defs_;

  MetricDefs() { }
  DISALLOW_COPY_AND_ASSIGN(MetricDefs);
};

/// A metric is a container for some value, identified by a string key. Most metrics are
/// numeric, but this metric base-class is general enough such that metrics may be lists,
/// maps, histograms or other arbitrary structures.
//
/// Metrics must be able to convert themselves to JSON (for integration with our monitoring
/// tools, and for rendering in webpages). See ToJson(), and also ToLegacyJson() which
/// ensures backwards compatibility with older versions of CM.
//
/// Metrics should be supplied with a description, which is included in JSON output for
/// display by monitoring systems / Impala's webpages.
//
/// TODO: Add ToThrift() for conversion to an RPC-friendly format.
class Metric {
 public:
  /// Empty virtual destructor
  virtual ~Metric() {}

  /// Builds a new Value into 'val', using (if required) the allocator from
  /// 'document'. Should set the following fields where appropriate:
  //
  /// name, value, human_readable, description
  virtual void ToJson(rapidjson::Document* document, rapidjson::Value* val) = 0;

  /// Adds a new json value directly to 'document' of the form:
  /// "name" : "human-readable-string"
  //
  /// This method is kept for backwards-compatibility with CM5.0.
  virtual void ToLegacyJson(rapidjson::Document* document) = 0;

  /// Builds a new Value into 'val', based on prometheus text exposition format
  /// Details of this format can be found below:
  /// https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/
  //  exposition_formats.md
  /// Should set the following fields where appropriate:
  //
  /// name, value, metric_kind
  virtual TMetricKind::type ToPrometheus(
      string name, std::stringstream* val, std::stringstream* metric_kind) = 0;

  /// Writes a human-readable representation of this metric to 'out'. This is the
  /// representation that is often displayed in webpages etc.
  virtual std::string ToHumanReadable() = 0;

  const std::string& key() const { return key_; }
  const std::string& description() const { return description_; }

  bool IsUnitTimeBased(TUnit::type type) {
    return (type == TUnit::type::TIME_MS || type == TUnit::type::TIME_US
        || type == TUnit::type::TIME_NS);
  }

 protected:
  /// Unique key identifying this metric
  const std::string key_;

  /// Description of this metric.
  /// TODO: share one copy amongst metrics with the same description.
  const std::string description_;

  friend class MetricGroup;

  Metric(const TMetricDef& def) : key_(def.key), description_(def.description) { }

  /// Convenience method to add standard fields (name, description, human readable string)
  /// to 'val'.
  void AddStandardFields(rapidjson::Document* document, rapidjson::Value* val);
};

/// A ScalarMetric has a value which is a simple primitive type: e.g. integers, strings
/// and floats. It is parameterised not only by the type of its value, but by both the
/// unit (e.g. bytes/s), drawn from TUnit and the 'kind' of the metric itself.
/// The kind can be one of:
/// - 'gauge', which may increase or decrease over time,
/// - 'counter' which can only increase over time
/// - 'property' which is a value store which can be read and written only
///
/// Note that management software may use the metric kind as hint on how to display
/// the value. ScalarMetrics return their current value through the GetValue() method
/// and set/initialize the value with SetValue(). Both methods are thread safe.
template<typename T, TMetricKind::type metric_kind_t>
class ScalarMetric: public Metric {
 public:
  ScalarMetric(const TMetricDef& metric_def)
    : Metric(metric_def), unit_(metric_def.units) {
    DCHECK_EQ(metric_kind_t, metric_def.kind) << "Metric kind does not match definition: "
        << metric_def.key;
  }

  virtual ~ScalarMetric() { }

  /// Returns the current value. Thread-safe.
  virtual T GetValue() = 0;

  virtual void ToJson(rapidjson::Document* document, rapidjson::Value* val) override;

  virtual TMetricKind::type ToPrometheus(
      std::string name, std::stringstream* val, std::stringstream* metric_kind) override;

  virtual std::string ToHumanReadable() override;

  virtual void ToLegacyJson(rapidjson::Document* document) override;

  TUnit::type unit() const { return unit_; }
  TMetricKind::type kind() const { return metric_kind_t; }

 protected:
  /// Units of this metric.
  const TUnit::type unit_;
};

/// An implementation of scalar metric with spinlock.
template<typename T, TMetricKind::type metric_kind_t>
class LockedMetric : public ScalarMetric<T, metric_kind_t> {
 public:
  LockedMetric(const TMetricDef& metric_def, const T& initial_value)
    : ScalarMetric<T, metric_kind_t>(metric_def), value_(initial_value) {}

  virtual ~LockedMetric() {}

  /// Atomically reads the current value.
  virtual T GetValue() override {
    boost::lock_guard<SpinLock> l(lock_);
    return value_;
  }

  /// Atomically sets the value.
  void SetValue(const T& value) {
    boost::lock_guard<SpinLock> l(lock_);
    value_ = value;
  }

 protected:
  /// Guards access to value_.
  SpinLock lock_;

  /// The current value of the metric
  T value_;
};

/// An implementation of 'gauge' or 'counter' metric kind. The metric can be incremented
/// atomically via the Increment() interface.
template<TMetricKind::type metric_kind_t>
class AtomicMetric : public ScalarMetric<int64_t, metric_kind_t> {
 public:
  AtomicMetric(const TMetricDef& metric_def, const int64_t initial_value)
    : ScalarMetric<int64_t, metric_kind_t>(metric_def), value_(initial_value) {
    DCHECK(metric_kind_t == TMetricKind::GAUGE || metric_kind_t == TMetricKind::COUNTER);
  }

  virtual ~AtomicMetric() {}

  /// Atomically reads the current value. May be overridden by derived classes.
  /// The default implementation just atomically loads 'value_'. Derived classes
  /// which derive the return value from multiple sources other than 'value_'
  /// need to take care of synchronization among sources.
  virtual int64_t GetValue() override { return value_.Load(); }

  /// Atomically sets the value.
  void SetValue(const int64_t& value) { value_.Store(value); }

  /// Adds 'delta' to the current value atomically and returns the new value.
  int64_t Increment(int64_t delta) {
    DCHECK(metric_kind_t != TMetricKind::COUNTER || delta >= 0)
        << "Can't decrement value of COUNTER metric: " << this->key();
    return value_.Add(delta);
  }

 protected:
  /// The current value of the metric.
  AtomicInt64 value_;
};

/// An AtomicMetric that keeps track of the highest value seen and the current value.
///
/// Implementation notes:
/// The hwm_value_ member maintains the HWM while the current_value_ metric member
/// maintains the current value. Note that since two separate atomics are used
/// for maintaining the current value and HWM, they could be out of sync for a short
/// duration. This behavior is acceptable for current use case. However, it is very
/// important that both the hwm_value_ and current_value_ members are updated together
/// using the interfaces from this class.
class AtomicHighWaterMarkGauge : public ScalarMetric<int64_t, TMetricKind::GAUGE> {
 public:
  AtomicHighWaterMarkGauge(
      const TMetricDef& metric_def, int64_t initial_value, IntGauge* current_value)
    : ScalarMetric<int64_t, TMetricKind::GAUGE>(metric_def),
      hwm_value_(initial_value),
      current_value_(current_value) {
    DCHECK(current_value_ != NULL && initial_value == current_value->GetValue());
  }

  ~AtomicHighWaterMarkGauge() {}

  /// Returns the current high water mark value.
  int64_t GetValue() override { return hwm_value_.Load(); }

  /// Atomically sets the current value and atomically sets the high water mark value.
  void SetValue(const int64_t& value) {
    current_value_->SetValue(value);
    UpdateMax(value);
  }

  /// Adds 'delta' to the current value atomically.
  /// The hwm value is also updated atomically.
  void Increment(int64_t delta) {
    const int64_t new_val = current_value_->Increment(delta);
    UpdateMax(new_val);
  }

  IntGauge* current_value() const { return current_value_; }

 private:
  FRIEND_TEST(MetricsTest, AtomicHighWaterMarkGauge);
  friend class TmpFileMgrTest;

  /// Set 'hwm_value_' to 'v' if 'v' is larger than 'hwm_value_'. The entire operation is
  /// atomic.
  void UpdateMax(int64_t v) {
    while (true) {
      int64_t old_max = hwm_value_.Load();
      int64_t new_max = std::max(old_max, v);
      if (new_max == old_max) break; // Avoid atomic update.
      if (LIKELY(hwm_value_.CompareAndSwap(old_max, new_max))) break;
    }
  }

  /// The high water mark value.
  AtomicInt64 hwm_value_;
  /// The metric representing the current value.
  IntGauge* current_value_;
};

/// Gauge metric that computes the sum of several gauges.
class SumGauge : public IntGauge {
 public:
  SumGauge(const TMetricDef& metric_def, const std::vector<IntGauge*>& gauges)
    : IntGauge(metric_def, 0), gauges_(gauges) {}

  virtual ~SumGauge() {}

  virtual int64_t GetValue() override {
    // Note that this doesn't hold the locks of all gauages before computing the sum so
    // it's possible for one of the gauages to change after being read and added to sum.
    int64_t sum = 0;
    for (auto gauge : gauges_) sum += gauge->GetValue();
    return sum;
  }

 private:
  /// The gauges to be summed.
  std::vector<IntGauge*> gauges_;
};

/// Gauge metric that negates another gauge.
class NegatedGauge : public IntGauge {
 public:
  NegatedGauge(const TMetricDef& metric_def, IntGauge* gauge)
    : IntGauge(metric_def, 0), gauge_(gauge) {}

  virtual ~NegatedGauge() {}

  virtual int64_t GetValue() override { return -gauge_->GetValue(); }

 private:
  /// The metric to be negated.
  IntGauge* gauge_;
};

/// Container for a set of metrics. A MetricGroup owns the memory for every metric
/// contained within it (see Add*() to create commonly used metric
/// types). Metrics are 'registered' with a MetricGroup and can be deleted/removed after
/// being registered. Note: deletion invalidates any pointers to the deleted metric.
//
/// MetricGroups may be organised hierarchically as a tree.
//
/// Typically a metric object is cached by its creator after registration. If a metric
/// must be retrieved without an available pointer, FindMetricForTesting() will search the
/// MetricGroup and all its descendent MetricGroups in turn.
//
/// TODO: Hierarchical naming: that is, resolve "group1.group2.metric-name" to a path
/// through the metric tree.
class MetricGroup {
 public:
  MetricGroup(const std::string& name);

  /// Registers a new metric. Ownership of the metric will be transferred to this
  /// MetricGroup object, so callers should take care not to destroy the Metric they pass
  /// in.
  //
  /// It is an error to call twice with metrics with the same key. The template parameter
  /// M must be a subclass of Metric.
  template <typename M>
  M* RegisterMetric(M* metric) {
    DCHECK(!metric->key_.empty());
    boost::lock_guard<SpinLock> l(lock_);
    DCHECK(metric_map_.find(metric->key_) == metric_map_.end()) << metric->key_;
    std::shared_ptr<M> metric_ptr(metric);
    metric_map_[metric->key_] = metric_ptr;
    return metric_ptr.get();
  }

  /// Remove the metric from the metric group and release its memory. This invalidates any
  /// pointers to the deleted metric.
  /// It is an error to call this with a non-existent or already removed metric.
  void RemoveMetric(const std::string& key, const std::string& metric_def_arg = "") {
    TMetricDef metric_def = MetricDefs::Get(key, metric_def_arg);
    DCHECK(!metric_def.key.empty());
    boost::lock_guard<SpinLock> l(lock_);
    DCHECK(metric_map_.find(metric_def.key) != metric_map_.end()) << metric_def.key;
    metric_map_.erase(metric_def.key);
    }

  /// Create a gauge metric object with given key and initial value (owned by this object)
  IntGauge* AddGauge(const std::string& key, const int64_t value,
      const std::string& metric_def_arg = "") {
    return RegisterMetric(new IntGauge(MetricDefs::Get(key, metric_def_arg), value));
  }

  DoubleGauge* AddDoubleGauge(const std::string& key, const double value,
      const std::string& metric_def_arg = "") {
    return RegisterMetric(new DoubleGauge(MetricDefs::Get(key, metric_def_arg), value));
  }

  template<typename T>
  LockedMetric<T, TMetricKind::PROPERTY>* AddProperty(const std::string& key,
      const T& value, const std::string& metric_def_arg = "") {
    return RegisterMetric(new LockedMetric<T, TMetricKind::PROPERTY>(
        MetricDefs::Get(key, metric_def_arg), value));
  }

  IntCounter* AddCounter(const std::string& key, const int64_t value,
      const std::string& metric_def_arg = "") {
    return RegisterMetric(new IntCounter(MetricDefs::Get(key, metric_def_arg), value));
  }

  AtomicHighWaterMarkGauge* AddHWMGauge(const std::string& key_hwm,
      const std::string& key_curent_value, const int64_t value,
      const std::string& metric_def_arg = "") {
    IntGauge* current_value_metric = RegisterMetric(
        new IntGauge(MetricDefs::Get(key_curent_value, metric_def_arg), value));
    return RegisterMetric(new AtomicHighWaterMarkGauge(
        MetricDefs::Get(key_hwm, metric_def_arg), value, current_value_metric));
  }

  /// Returns a metric by key. All MetricGroups reachable from this group are searched in
  /// depth-first order, starting with the root group.  Returns NULL if there is no metric
  /// with that key. This is not a very cheap operation; the result should be cached where
  /// possible.
  //
  /// Used for testing only.
  template <typename M>
  M* FindMetricForTesting(const std::string& key) {
    return reinterpret_cast<M*>(FindMetricForTestingInternal(key));
  }

  /// Register page callbacks with the webserver. Only the root of any metric group
  /// hierarchy needs to do this.
  Status Init(Webserver* webserver);

  /// Converts this metric group (and optionally all of its children recursively) to JSON.
  void ToJson(bool include_children, rapidjson::Document* document,
      rapidjson::Value* out_val);

  /// Converts this metric group (and optionally all of its children recursively) to JSON.
  void ToPrometheus(bool include_children, std::stringstream* out_val);

  /// Creates or returns an already existing child metric group.
  MetricGroup* GetOrCreateChildGroup(const std::string& name);

  /// Returns a child metric group with name 'name', or NULL if that group doesn't exist
  MetricGroup* FindChildGroup(const std::string& name);

  /// Useful for debuggers, returns the output of CMCompatibleCallback().
  std::string DebugString();

  const std::string& name() const { return name_; }

 private:
  FRIEND_TEST(MetricsTest, PrometheusMetricNames);
  /// Pool containing all child metric groups.
  boost::scoped_ptr<ObjectPool> obj_pool_;

  /// Name of this metric group.
  std::string name_;

  /// Guards metric_map_ and children_
  SpinLock lock_;

  /// Contains all metric objects, indexed by key. The shared_ptr enclosing the metric
  /// pointer owns the memory and only a single instance of this shared pointer exists
  /// which ensures that it is release when the entry is removed from the map.
  typedef std::unordered_map<std::string, std::shared_ptr<Metric>> MetricMap;
  MetricMap metric_map_;

  /// All child metric groups
  typedef std::unordered_map<std::string, MetricGroup*> ChildGroupMap;
  ChildGroupMap children_;

  /// Webserver callback for /metrics. Produces a tree of JSON values, each representing a
  /// metric group, and each including a list of metrics, and a list of immediate
  /// children.  If args contains a paramater 'metric', only the json for that metric is
  /// returned.
  void TemplateCallback(const Webserver::WebRequest& req,
      rapidjson::Document* document);

  /// Webserver callback for /metricsPrometheus. Produces string in prometheus format,
  /// each representing metric group, and each including a list of metrics, and a list
  /// of immediate children.  If args contains a paramater 'metric', only the json for
  /// that metric is returned.
  void PrometheusCallback(const Webserver::WebRequest& req, std::stringstream* data,
      HttpStatusCode* response);

  /// Legacy webpage callback for CM 5.0 and earlier. Produces a flattened map of (key,
  /// value) pairs for all metrics in this hierarchy.
  /// If args contains a paramater 'metric', only the json for that metric is returned.
  void CMCompatibleCallback(const Webserver::WebRequest& req,
      rapidjson::Document* document);

  /// Non-templated implementation for FindMetricForTesting() that does not cast.
  Metric* FindMetricForTestingInternal(const std::string& key);

  /// Convert an Impala metric name into its equivalent name for Prometheus.
  /// All metrics that do not already have "impala_" as a prefix are prefixed with
  /// "impala_" and have their names transformed to fit the standard Prometheus
  /// metric naming conventions.
  /// E.g.
  /// * "impala-server.num-fragments" becomes "impala_server_num_fragments"
  /// * "catalog.num-databases" becomes "impala_catalog_num_databases"
  /// * "memory.rss" becomes "impala_memory_rss"
  static std::string ImpalaToPrometheusName(const std::string& impala_metric_name);
};

/// Convenience method to instantiate a TMetricDef with a subset of its fields defined.
/// Most externally-visible metrics should be defined in metrics.json and retrieved via
/// MetricDefs::Get(). This alternative method of instantiating TMetricDefs is only used
/// in special cases where the regular approach is unsuitable.
TMetricDef MakeTMetricDef(const std::string& key, TMetricKind::type kind,
    TUnit::type unit);

/// Helper to convert a value 'val' that is a time metric into fractional seconds
/// (the only unit of time supported by Prometheus).
template <typename T>
double ConvertToPrometheusSecs(const T& val, TUnit::type unit);

// These template classes are instantiated in the .cc file.
extern template class LockedMetric<bool, TMetricKind::PROPERTY>;
extern template class LockedMetric<std::string, TMetricKind::PROPERTY>;
extern template class LockedMetric<double, TMetricKind::GAUGE>;
extern template class AtomicMetric<TMetricKind::GAUGE>;
extern template class AtomicMetric<TMetricKind::COUNTER>;
}
