| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <map> |
| #include <sstream> |
| #include <string> |
| #include <vector> |
| #include <boost/function.hpp> |
| #include <boost/scoped_ptr.hpp> |
| #include <boost/thread/locks.hpp> |
| #include <gtest/gtest_prod.h> // for FRIEND_TEST |
| #include <rapidjson/fwd.h> |
| |
| #include "common/atomic.h" |
| #include "common/logging.h" |
| #include "common/object-pool.h" |
| #include "common/status.h" |
| #include "kudu/util/web_callback_registry.h" |
| #include "util/debug-util.h" |
| #include "util/metrics-fwd.h" |
| #include "util/spinlock.h" |
| #include "util/webserver.h" |
| |
| using kudu::HttpStatusCode; |
| |
| namespace impala { |
| |
| /// Singleton that provides metric definitions. Metrics are defined in metrics.json |
| /// and generate_metrics.py produces MetricDefs.thrift. This singleton wraps an instance |
| /// of the thrift definitions. |
| class MetricDefs { |
| public: |
| /// Gets the TMetricDef for the metric key. 'arg' is an optional argument to the |
| /// TMetricDef for metrics defined by a format string. The key must exist or a DCHECK |
| /// will fail. |
| /// TODO: Support multiple arguments. |
| static TMetricDef Get(const std::string& key, const std::string& arg = ""); |
| |
| private: |
| friend class MetricsTest; |
| |
| /// Gets the MetricDefs singleton. |
| static MetricDefs* GetInstance(); |
| |
| /// Contains the map of all TMetricDefs, non-const for testing |
| MetricDefsConstants metric_defs_; |
| |
| MetricDefs() { } |
| DISALLOW_COPY_AND_ASSIGN(MetricDefs); |
| }; |
| |
| /// A metric is a container for some value, identified by a string key. Most metrics are |
| /// numeric, but this metric base-class is general enough such that metrics may be lists, |
| /// maps, histograms or other arbitrary structures. |
| // |
| /// Metrics must be able to convert themselves to JSON (for integration with our monitoring |
| /// tools, and for rendering in webpages). See ToJson(), and also ToLegacyJson() which |
| /// ensures backwards compatibility with older versions of CM. |
| // |
| /// Metrics should be supplied with a description, which is included in JSON output for |
| /// display by monitoring systems / Impala's webpages. |
| // |
| /// TODO: Add ToThrift() for conversion to an RPC-friendly format. |
| class Metric { |
| public: |
| /// Empty virtual destructor |
| virtual ~Metric() {} |
| |
| /// Builds a new Value into 'val', using (if required) the allocator from |
| /// 'document'. Should set the following fields where appropriate: |
| // |
| /// name, value, human_readable, description |
| virtual void ToJson(rapidjson::Document* document, rapidjson::Value* val) = 0; |
| |
| /// Adds a new json value directly to 'document' of the form: |
| /// "name" : "human-readable-string" |
| // |
| /// This method is kept for backwards-compatibility with CM5.0. |
| virtual void ToLegacyJson(rapidjson::Document* document) = 0; |
| |
| /// Builds a new Value into 'val', based on prometheus text exposition format |
| /// Details of this format can be found below: |
| /// https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/ |
| // exposition_formats.md |
| /// Should set the following fields where appropriate: |
| // |
| /// name, value, metric_kind |
| virtual TMetricKind::type ToPrometheus( |
| string name, std::stringstream* val, std::stringstream* metric_kind) = 0; |
| |
| /// Writes a human-readable representation of this metric to 'out'. This is the |
| /// representation that is often displayed in webpages etc. |
| virtual std::string ToHumanReadable() = 0; |
| |
| const std::string& key() const { return key_; } |
| const std::string& description() const { return description_; } |
| |
| bool IsUnitTimeBased(TUnit::type type) { |
| return (type == TUnit::type::TIME_MS || type == TUnit::type::TIME_US |
| || type == TUnit::type::TIME_NS); |
| } |
| |
| protected: |
| /// Unique key identifying this metric |
| const std::string key_; |
| |
| /// Description of this metric. |
| /// TODO: share one copy amongst metrics with the same description. |
| const std::string description_; |
| |
| friend class MetricGroup; |
| |
| Metric(const TMetricDef& def) : key_(def.key), description_(def.description) { } |
| |
| /// Convenience method to add standard fields (name, description, human readable string) |
| /// to 'val'. |
| void AddStandardFields(rapidjson::Document* document, rapidjson::Value* val); |
| }; |
| |
| /// A ScalarMetric has a value which is a simple primitive type: e.g. integers, strings |
| /// and floats. It is parameterised not only by the type of its value, but by both the |
| /// unit (e.g. bytes/s), drawn from TUnit and the 'kind' of the metric itself. |
| /// The kind can be one of: |
| /// - 'gauge', which may increase or decrease over time, |
| /// - 'counter' which can only increase over time |
| /// - 'property' which is a value store which can be read and written only |
| /// |
| /// Note that management software may use the metric kind as hint on how to display |
| /// the value. ScalarMetrics return their current value through the GetValue() method |
| /// and set/initialize the value with SetValue(). Both methods are thread safe. |
| template<typename T, TMetricKind::type metric_kind_t> |
| class ScalarMetric: public Metric { |
| public: |
| ScalarMetric(const TMetricDef& metric_def) |
| : Metric(metric_def), unit_(metric_def.units) { |
| DCHECK_EQ(metric_kind_t, metric_def.kind) << "Metric kind does not match definition: " |
| << metric_def.key; |
| } |
| |
| virtual ~ScalarMetric() { } |
| |
| /// Returns the current value. Thread-safe. |
| virtual T GetValue() = 0; |
| |
| virtual void ToJson(rapidjson::Document* document, rapidjson::Value* val) override; |
| |
| virtual TMetricKind::type ToPrometheus( |
| std::string name, std::stringstream* val, std::stringstream* metric_kind) override; |
| |
| virtual std::string ToHumanReadable() override; |
| |
| virtual void ToLegacyJson(rapidjson::Document* document) override; |
| |
| TUnit::type unit() const { return unit_; } |
| TMetricKind::type kind() const { return metric_kind_t; } |
| |
| protected: |
| /// Units of this metric. |
| const TUnit::type unit_; |
| }; |
| |
| /// An implementation of scalar metric with spinlock. |
| template<typename T, TMetricKind::type metric_kind_t> |
| class LockedMetric : public ScalarMetric<T, metric_kind_t> { |
| public: |
| LockedMetric(const TMetricDef& metric_def, const T& initial_value) |
| : ScalarMetric<T, metric_kind_t>(metric_def), value_(initial_value) {} |
| |
| virtual ~LockedMetric() {} |
| |
| /// Atomically reads the current value. |
| virtual T GetValue() override { |
| boost::lock_guard<SpinLock> l(lock_); |
| return value_; |
| } |
| |
| /// Atomically sets the value. |
| void SetValue(const T& value) { |
| boost::lock_guard<SpinLock> l(lock_); |
| value_ = value; |
| } |
| |
| protected: |
| /// Guards access to value_. |
| SpinLock lock_; |
| |
| /// The current value of the metric |
| T value_; |
| }; |
| |
| /// An implementation of 'gauge' or 'counter' metric kind. The metric can be incremented |
| /// atomically via the Increment() interface. |
| template<TMetricKind::type metric_kind_t> |
| class AtomicMetric : public ScalarMetric<int64_t, metric_kind_t> { |
| public: |
| AtomicMetric(const TMetricDef& metric_def, const int64_t initial_value) |
| : ScalarMetric<int64_t, metric_kind_t>(metric_def), value_(initial_value) { |
| DCHECK(metric_kind_t == TMetricKind::GAUGE || metric_kind_t == TMetricKind::COUNTER); |
| } |
| |
| virtual ~AtomicMetric() {} |
| |
| /// Atomically reads the current value. May be overridden by derived classes. |
| /// The default implementation just atomically loads 'value_'. Derived classes |
| /// which derive the return value from multiple sources other than 'value_' |
| /// need to take care of synchronization among sources. |
| virtual int64_t GetValue() override { return value_.Load(); } |
| |
| /// Atomically sets the value. |
| void SetValue(const int64_t& value) { value_.Store(value); } |
| |
| /// Adds 'delta' to the current value atomically and returns the new value. |
| int64_t Increment(int64_t delta) { |
| DCHECK(metric_kind_t != TMetricKind::COUNTER || delta >= 0) |
| << "Can't decrement value of COUNTER metric: " << this->key(); |
| return value_.Add(delta); |
| } |
| |
| protected: |
| /// The current value of the metric. |
| AtomicInt64 value_; |
| }; |
| |
| /// An AtomicMetric that keeps track of the highest value seen and the current value. |
| /// |
| /// Implementation notes: |
| /// The hwm_value_ member maintains the HWM while the current_value_ metric member |
| /// maintains the current value. Note that since two separate atomics are used |
| /// for maintaining the current value and HWM, they could be out of sync for a short |
| /// duration. This behavior is acceptable for current use case. However, it is very |
| /// important that both the hwm_value_ and current_value_ members are updated together |
| /// using the interfaces from this class. |
| class AtomicHighWaterMarkGauge : public ScalarMetric<int64_t, TMetricKind::GAUGE> { |
| public: |
| AtomicHighWaterMarkGauge( |
| const TMetricDef& metric_def, int64_t initial_value, IntGauge* current_value) |
| : ScalarMetric<int64_t, TMetricKind::GAUGE>(metric_def), |
| hwm_value_(initial_value), |
| current_value_(current_value) { |
| DCHECK(current_value_ != NULL && initial_value == current_value->GetValue()); |
| } |
| |
| ~AtomicHighWaterMarkGauge() {} |
| |
| /// Returns the current high water mark value. |
| int64_t GetValue() override { return hwm_value_.Load(); } |
| |
| /// Atomically sets the current value and atomically sets the high water mark value. |
| void SetValue(const int64_t& value) { |
| current_value_->SetValue(value); |
| UpdateMax(value); |
| } |
| |
| /// Adds 'delta' to the current value atomically. |
| /// The hwm value is also updated atomically. |
| void Increment(int64_t delta) { |
| const int64_t new_val = current_value_->Increment(delta); |
| UpdateMax(new_val); |
| } |
| |
| IntGauge* current_value() const { return current_value_; } |
| |
| private: |
| FRIEND_TEST(MetricsTest, AtomicHighWaterMarkGauge); |
| friend class TmpFileMgrTest; |
| |
| /// Set 'hwm_value_' to 'v' if 'v' is larger than 'hwm_value_'. The entire operation is |
| /// atomic. |
| void UpdateMax(int64_t v) { |
| while (true) { |
| int64_t old_max = hwm_value_.Load(); |
| int64_t new_max = std::max(old_max, v); |
| if (new_max == old_max) break; // Avoid atomic update. |
| if (LIKELY(hwm_value_.CompareAndSwap(old_max, new_max))) break; |
| } |
| } |
| |
| /// The high water mark value. |
| AtomicInt64 hwm_value_; |
| /// The metric representing the current value. |
| IntGauge* current_value_; |
| }; |
| |
| /// Gauge metric that computes the sum of several gauges. |
| class SumGauge : public IntGauge { |
| public: |
| SumGauge(const TMetricDef& metric_def, const std::vector<IntGauge*>& gauges) |
| : IntGauge(metric_def, 0), gauges_(gauges) {} |
| |
| virtual ~SumGauge() {} |
| |
| virtual int64_t GetValue() override { |
| // Note that this doesn't hold the locks of all gauages before computing the sum so |
| // it's possible for one of the gauages to change after being read and added to sum. |
| int64_t sum = 0; |
| for (auto gauge : gauges_) sum += gauge->GetValue(); |
| return sum; |
| } |
| |
| private: |
| /// The gauges to be summed. |
| std::vector<IntGauge*> gauges_; |
| }; |
| |
| /// Gauge metric that negates another gauge. |
| class NegatedGauge : public IntGauge { |
| public: |
| NegatedGauge(const TMetricDef& metric_def, IntGauge* gauge) |
| : IntGauge(metric_def, 0), gauge_(gauge) {} |
| |
| virtual ~NegatedGauge() {} |
| |
| virtual int64_t GetValue() override { return -gauge_->GetValue(); } |
| |
| private: |
| /// The metric to be negated. |
| IntGauge* gauge_; |
| }; |
| |
| /// Container for a set of metrics. A MetricGroup owns the memory for every metric |
| /// contained within it (see Add*() to create commonly used metric |
| /// types). Metrics are 'registered' with a MetricGroup and can be deleted/removed after |
| /// being registered. Note: deletion invalidates any pointers to the deleted metric. |
| // |
| /// MetricGroups may be organised hierarchically as a tree. |
| // |
| /// Typically a metric object is cached by its creator after registration. If a metric |
| /// must be retrieved without an available pointer, FindMetricForTesting() will search the |
| /// MetricGroup and all its descendent MetricGroups in turn. |
| // |
| /// TODO: Hierarchical naming: that is, resolve "group1.group2.metric-name" to a path |
| /// through the metric tree. |
| class MetricGroup { |
| public: |
| MetricGroup(const std::string& name); |
| |
| /// Registers a new metric. Ownership of the metric will be transferred to this |
| /// MetricGroup object, so callers should take care not to destroy the Metric they pass |
| /// in. |
| // |
| /// It is an error to call twice with metrics with the same key. The template parameter |
| /// M must be a subclass of Metric. |
| template <typename M> |
| M* RegisterMetric(M* metric) { |
| DCHECK(!metric->key_.empty()); |
| boost::lock_guard<SpinLock> l(lock_); |
| DCHECK(metric_map_.find(metric->key_) == metric_map_.end()) << metric->key_; |
| std::shared_ptr<M> metric_ptr(metric); |
| metric_map_[metric->key_] = metric_ptr; |
| return metric_ptr.get(); |
| } |
| |
| /// Remove the metric from the metric group and release its memory. This invalidates any |
| /// pointers to the deleted metric. |
| /// It is an error to call this with a non-existent or already removed metric. |
| void RemoveMetric(const std::string& key, const std::string& metric_def_arg = "") { |
| TMetricDef metric_def = MetricDefs::Get(key, metric_def_arg); |
| DCHECK(!metric_def.key.empty()); |
| boost::lock_guard<SpinLock> l(lock_); |
| DCHECK(metric_map_.find(metric_def.key) != metric_map_.end()) << metric_def.key; |
| metric_map_.erase(metric_def.key); |
| } |
| |
| /// Create a gauge metric object with given key and initial value (owned by this object) |
| IntGauge* AddGauge(const std::string& key, const int64_t value, |
| const std::string& metric_def_arg = "") { |
| return RegisterMetric(new IntGauge(MetricDefs::Get(key, metric_def_arg), value)); |
| } |
| |
| DoubleGauge* AddDoubleGauge(const std::string& key, const double value, |
| const std::string& metric_def_arg = "") { |
| return RegisterMetric(new DoubleGauge(MetricDefs::Get(key, metric_def_arg), value)); |
| } |
| |
| template<typename T> |
| LockedMetric<T, TMetricKind::PROPERTY>* AddProperty(const std::string& key, |
| const T& value, const std::string& metric_def_arg = "") { |
| return RegisterMetric(new LockedMetric<T, TMetricKind::PROPERTY>( |
| MetricDefs::Get(key, metric_def_arg), value)); |
| } |
| |
| IntCounter* AddCounter(const std::string& key, const int64_t value, |
| const std::string& metric_def_arg = "") { |
| return RegisterMetric(new IntCounter(MetricDefs::Get(key, metric_def_arg), value)); |
| } |
| |
| AtomicHighWaterMarkGauge* AddHWMGauge(const std::string& key_hwm, |
| const std::string& key_curent_value, const int64_t value, |
| const std::string& metric_def_arg = "") { |
| IntGauge* current_value_metric = RegisterMetric( |
| new IntGauge(MetricDefs::Get(key_curent_value, metric_def_arg), value)); |
| return RegisterMetric(new AtomicHighWaterMarkGauge( |
| MetricDefs::Get(key_hwm, metric_def_arg), value, current_value_metric)); |
| } |
| |
| /// Returns a metric by key. All MetricGroups reachable from this group are searched in |
| /// depth-first order, starting with the root group. Returns NULL if there is no metric |
| /// with that key. This is not a very cheap operation; the result should be cached where |
| /// possible. |
| // |
| /// Used for testing only. |
| template <typename M> |
| M* FindMetricForTesting(const std::string& key) { |
| return reinterpret_cast<M*>(FindMetricForTestingInternal(key)); |
| } |
| |
| /// Register page callbacks with the webserver. Only the root of any metric group |
| /// hierarchy needs to do this. |
| Status Init(Webserver* webserver); |
| |
| /// Converts this metric group (and optionally all of its children recursively) to JSON. |
| void ToJson(bool include_children, rapidjson::Document* document, |
| rapidjson::Value* out_val); |
| |
| /// Converts this metric group (and optionally all of its children recursively) to JSON. |
| void ToPrometheus(bool include_children, std::stringstream* out_val); |
| |
| /// Creates or returns an already existing child metric group. |
| MetricGroup* GetOrCreateChildGroup(const std::string& name); |
| |
| /// Returns a child metric group with name 'name', or NULL if that group doesn't exist |
| MetricGroup* FindChildGroup(const std::string& name); |
| |
| /// Useful for debuggers, returns the output of CMCompatibleCallback(). |
| std::string DebugString(); |
| |
| const std::string& name() const { return name_; } |
| |
| private: |
| FRIEND_TEST(MetricsTest, PrometheusMetricNames); |
| /// Pool containing all child metric groups. |
| boost::scoped_ptr<ObjectPool> obj_pool_; |
| |
| /// Name of this metric group. |
| std::string name_; |
| |
| /// Guards metric_map_ and children_ |
| SpinLock lock_; |
| |
| /// Contains all metric objects, indexed by key. The shared_ptr enclosing the metric |
| /// pointer owns the memory and only a single instance of this shared pointer exists |
| /// which ensures that it is release when the entry is removed from the map. |
| typedef std::unordered_map<std::string, std::shared_ptr<Metric>> MetricMap; |
| MetricMap metric_map_; |
| |
| /// All child metric groups |
| typedef std::unordered_map<std::string, MetricGroup*> ChildGroupMap; |
| ChildGroupMap children_; |
| |
| /// Webserver callback for /metrics. Produces a tree of JSON values, each representing a |
| /// metric group, and each including a list of metrics, and a list of immediate |
| /// children. If args contains a paramater 'metric', only the json for that metric is |
| /// returned. |
| void TemplateCallback(const Webserver::WebRequest& req, |
| rapidjson::Document* document); |
| |
| /// Webserver callback for /metricsPrometheus. Produces string in prometheus format, |
| /// each representing metric group, and each including a list of metrics, and a list |
| /// of immediate children. If args contains a paramater 'metric', only the json for |
| /// that metric is returned. |
| void PrometheusCallback(const Webserver::WebRequest& req, std::stringstream* data, |
| HttpStatusCode* response); |
| |
| /// Legacy webpage callback for CM 5.0 and earlier. Produces a flattened map of (key, |
| /// value) pairs for all metrics in this hierarchy. |
| /// If args contains a paramater 'metric', only the json for that metric is returned. |
| void CMCompatibleCallback(const Webserver::WebRequest& req, |
| rapidjson::Document* document); |
| |
| /// Non-templated implementation for FindMetricForTesting() that does not cast. |
| Metric* FindMetricForTestingInternal(const std::string& key); |
| |
| /// Convert an Impala metric name into its equivalent name for Prometheus. |
| /// All metrics that do not already have "impala_" as a prefix are prefixed with |
| /// "impala_" and have their names transformed to fit the standard Prometheus |
| /// metric naming conventions. |
| /// E.g. |
| /// * "impala-server.num-fragments" becomes "impala_server_num_fragments" |
| /// * "catalog.num-databases" becomes "impala_catalog_num_databases" |
| /// * "memory.rss" becomes "impala_memory_rss" |
| static std::string ImpalaToPrometheusName(const std::string& impala_metric_name); |
| }; |
| |
| /// Convenience method to instantiate a TMetricDef with a subset of its fields defined. |
| /// Most externally-visible metrics should be defined in metrics.json and retrieved via |
| /// MetricDefs::Get(). This alternative method of instantiating TMetricDefs is only used |
| /// in special cases where the regular approach is unsuitable. |
| TMetricDef MakeTMetricDef(const std::string& key, TMetricKind::type kind, |
| TUnit::type unit); |
| |
| /// Helper to convert a value 'val' that is a time metric into fractional seconds |
| /// (the only unit of time supported by Prometheus). |
| template <typename T> |
| double ConvertToPrometheusSecs(const T& val, TUnit::type unit); |
| |
| // These template classes are instantiated in the .cc file. |
| extern template class LockedMetric<bool, TMetricKind::PROPERTY>; |
| extern template class LockedMetric<std::string, TMetricKind::PROPERTY>; |
| extern template class LockedMetric<double, TMetricKind::GAUGE>; |
| extern template class AtomicMetric<TMetricKind::GAUGE>; |
| extern template class AtomicMetric<TMetricKind::COUNTER>; |
| } |