blob: b24aeb13542d8ec86d58b3037d2c3109f72bb5b8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/util/metrics.h"
#include <iostream>
#include <tuple>
#include <utility>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/singleton.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/hdr_histogram.h"
#include "kudu/util/histogram.pb.h"
#include "kudu/util/status.h"
#include "kudu/util/string_case.h"
DEFINE_int32(metrics_retirement_age_ms, 120 * 1000,
"The minimum number of milliseconds a metric will be kept for after it is "
"no longer active. (Advanced option)");
TAG_FLAG(metrics_retirement_age_ms, runtime);
TAG_FLAG(metrics_retirement_age_ms, advanced);
// Process/server-wide metrics should go into the 'server' entity.
// More complex applications will define other entities.
METRIC_DEFINE_entity(server);
using std::string;
using std::unordered_set;
using std::vector;
using strings::Substitute;
using strings::SubstituteAndAppend;
namespace kudu {
template<typename Collection>
void WriteMetricsToJson(JsonWriter* writer,
const Collection& metrics,
const MetricJsonOptions& opts) {
writer->String("metrics");
writer->StartArray();
for (const auto& val : metrics) {
const auto& m = val.second;
if (m->ModifiedInOrAfterEpoch(opts.only_modified_in_or_after_epoch)) {
if (!opts.include_untouched_metrics && m->IsUntouched()) {
continue;
}
WARN_NOT_OK(m->WriteAsJson(writer, opts),
Substitute("Failed to write $0 as JSON", val.first->name()));
}
}
writer->EndArray();
}
void WriteMetricsPrometheus(PrometheusWriter* writer,
const MetricEntity::MetricMap& metrics,
const string& prefix) {
for (const auto& [name, val] : metrics) {
WARN_NOT_OK(val->WriteAsPrometheus(writer, prefix),
Substitute("unable to write '$0' ($1) in Prometheus format",
name, val->prototype()->description()));
}
}
void WriteToJson(JsonWriter* writer,
const MergedEntityMetrics &merged_entity_metrics,
const MetricJsonOptions &opts) {
for (const auto& entity_metrics : merged_entity_metrics) {
if (entity_metrics.second.empty()) {
continue;
}
writer->StartObject();
writer->String("type");
writer->String(entity_metrics.first.type_);
writer->String("id");
writer->String(entity_metrics.first.id_);
WriteMetricsToJson(writer, entity_metrics.second, opts);
writer->EndObject();
}
}
//
// MetricUnit
//
const char* MetricUnit::Name(Type unit) {
switch (unit) {
case kCacheHits:
return "hits";
case kCacheQueries:
return "queries";
case kBytes:
return "bytes";
case kRequests:
return "requests";
case kEntries:
return "entries";
case kRows:
return "rows";
case kCells:
return "cells";
case kConnections:
return "connections";
case kOperations:
return "operations";
case kProbes:
return "probes";
case kNanoseconds:
return "nanoseconds";
case kMicroseconds:
return "microseconds";
case kMilliseconds:
return "milliseconds";
case kSeconds:
return "seconds";
case kThreads:
return "threads";
case kTransactions:
return "transactions";
case kUnits:
return "units";
case kScanners:
return "scanners";
case kMaintenanceOperations:
return "operations";
case kBlocks:
return "blocks";
case kHoles:
return "holes";
case kLogBlockContainers:
return "log block containers";
case kTasks:
return "tasks";
case kMessages:
return "messages";
case kContextSwitches:
return "context switches";
case kDataDirectories:
return "data directories";
case kState:
return "state";
case kSessions:
return "sessions";
case kTablets:
return "tablets";
default:
DCHECK(false) << "Unknown unit with type = " << unit;
return "UNKNOWN UNIT";
}
}
//
// MetricType
//
const char* const MetricType::kGaugeType = "gauge";
const char* const MetricType::kCounterType = "counter";
const char* const MetricType::kHistogramType = "histogram";
const char* MetricType::Name(MetricType::Type type) {
switch (type) {
case kGauge:
return kGaugeType;
case kCounter:
return kCounterType;
case kHistogram:
return kHistogramType;
default:
return "UNKNOWN TYPE";
}
}
//
// MetricEntityPrototype
//
MetricEntityPrototype::MetricEntityPrototype(const char* name)
: name_(name) {
MetricPrototypeRegistry::get()->AddEntity(this);
}
MetricEntityPrototype::~MetricEntityPrototype() {
}
scoped_refptr<MetricEntity> MetricEntityPrototype::Instantiate(
MetricRegistry* registry,
const string& id,
const MetricEntity::AttributeMap& initial_attrs) const {
return registry->FindOrCreateEntity(this, id, initial_attrs);
}
//
// MetricEntity
//
MetricEntity::MetricEntity(const MetricEntityPrototype* prototype,
string id, AttributeMap attributes)
: prototype_(prototype),
id_(std::move(id)),
attributes_(std::move(attributes)),
published_(true) {}
MetricEntity::~MetricEntity() {
}
void MetricEntity::CheckInstantiation(const MetricPrototype* proto) const {
CHECK_STREQ(prototype_->name(), proto->entity_type())
<< "Metric " << proto->name() << " may not be instantiated entity of type "
<< prototype_->name() << " (expected: " << proto->entity_type() << ")";
}
scoped_refptr<Metric> MetricEntity::FindOrNull(const MetricPrototype& prototype) const {
std::lock_guard<simple_spinlock> l(lock_);
return FindPtrOrNull(metric_map_, &prototype);
}
namespace {
bool MatchName(const string& name, const string& other) {
string name_uc;
ToUpperCase(name, &name_uc);
string other_uc;
ToUpperCase(other, &other_uc);
// The parameter is a case-insensitive substring match of the metric name.
return name_uc.find(other_uc) != string::npos;
}
bool MatchNameInList(const string& name, const vector<string>& names) {
for (const string& other : names) {
if (MatchName(name, other)) {
return true;
}
}
return false;
}
const char* MetricLevelName(MetricLevel level) {
switch (level) {
case MetricLevel::kDebug:
return "debug";
case MetricLevel::kInfo:
return "info";
case MetricLevel::kWarn:
return "warn";
default:
return "UNKNOWN LEVEL";
}
}
int MetricLevelNumeric(MetricLevel level) {
switch (level) {
case MetricLevel::kDebug:
return 0;
case MetricLevel::kInfo:
return 1;
case MetricLevel::kWarn:
return 2;
default:
LOG(FATAL) << "Unknown metric level";
}
}
} // anonymous namespace
Status MetricEntity::GetMetricsAndAttrs(const MetricFilters& filters,
MetricMap* metrics,
AttributeMap* attrs) const {
CHECK(metrics);
CHECK(attrs);
// Filter the 'type'.
if (!filters.entity_types.empty() && !MatchNameInList(prototype_->name(), filters.entity_types)) {
return Status::NotFound("entity is filtered by entity type");
}
// Filter the 'id'.
if (!filters.entity_ids.empty() && !MatchNameInList(id_, filters.entity_ids)) {
return Status::NotFound("entity is filtered by entity id");
}
{
// Snapshot the metrics in this registry (not guaranteed to be a consistent snapshot)
std::lock_guard<simple_spinlock> l(lock_);
*attrs = attributes_;
*metrics = metric_map_;
}
// Filter the 'attributes'.
if (!filters.entity_attrs.empty()) {
bool match_attrs = false;
DCHECK(filters.entity_attrs.size() % 2 == 0);
for (int i = 0; i < filters.entity_attrs.size(); i += 2) {
// The attr_key can't be found or the attr_val can't be matched.
AttributeMap::const_iterator it = attrs->find(filters.entity_attrs[i]);
if (it == attrs->end() || !MatchNameInList(it->second, { filters.entity_attrs[i+1] })) {
continue;
}
match_attrs = true;
break;
}
// None of them match.
if (!match_attrs) {
return Status::NotFound("entity is filtered by some attribute");
}
}
// Filter the 'metrics'.
if (!filters.entity_metrics.empty()) {
for (auto metric = metrics->begin(); metric != metrics->end();) {
if (!MatchNameInList(metric->first->name(), filters.entity_metrics)) {
metric = metrics->erase(metric);
} else {
++metric;
}
}
// None of them match.
if (metrics->empty()) {
return Status::NotFound("entity is filtered by metric types");
}
}
// Filter the metrics by 'level'.
// Includes all levels above the lowest matched level.
int lowest = MetricLevelNumeric(MetricLevel::kDebug); // Default to the lowest level.
if (!filters.entity_level.empty()) {
if (MatchName(MetricLevelName(MetricLevel::kDebug), filters.entity_level)) {
lowest = MetricLevelNumeric(MetricLevel::kDebug);
} else if (MatchName(MetricLevelName(MetricLevel::kInfo), filters.entity_level)) {
lowest = MetricLevelNumeric(MetricLevel::kInfo);
} else if (MatchName(MetricLevelName(MetricLevel::kWarn), filters.entity_level)) {
lowest = MetricLevelNumeric(MetricLevel::kWarn);
}
}
for (auto metric = metrics->begin(); metric != metrics->end();) {
if (MetricLevelNumeric(metric->first->level()) < lowest) {
metric = metrics->erase(metric);
} else {
++metric;
}
// None of them match.
if (metrics->empty()) {
return Status::NotFound("entity is filtered by metric level");
}
}
return Status::OK();
}
Status MetricEntity::WriteAsJson(JsonWriter* writer, const MetricJsonOptions& opts) const {
MetricMap metrics;
AttributeMap attrs;
Status s = GetMetricsAndAttrs(opts.filters, &metrics, &attrs);
if (s.IsNotFound()) {
// Status::NotFound is returned when this entity has been filtered, treat it
// as OK, and skip printing it.
return Status::OK();
}
writer->StartObject();
writer->String("type");
writer->String(prototype_->name());
writer->String("id");
writer->String(id_);
if (opts.include_entity_attributes) {
writer->String("attributes");
writer->StartObject();
for (const AttributeMap::value_type& val : attrs) {
writer->String(val.first);
writer->String(val.second);
}
writer->EndObject();
}
WriteMetricsToJson(writer, metrics, opts);
writer->EndObject();
return Status::OK();
}
Status MetricEntity::WriteAsPrometheus(PrometheusWriter* writer) const {
static const string kIdMaster = "kudu.master";
static const string kIdTabletServer = "kudu.tabletserver";
if (strcmp(prototype_->name(), "server") != 0) {
// Only server-level metrics are emitted in Prometheus format as of now,
// non-server metric entities are currently silently skipped.
//
// TODO(KUDU-3563): output tablet-level metrics in Prometheus format as well
return Status::OK();
}
// Empty filters result in getting all the metrics for this MetricEntity.
//
// TODO(aserbin): instead of hard-coding, pass MetricFilters as a parameter
MetricFilters filters;
filters.entity_level = "debug";
MetricMap metrics;
AttributeMap attrs;
const auto s = GetMetricsAndAttrs(filters, &metrics, &attrs);
if (s.IsNotFound()) {
// Status::NotFound is returned when this entity has been filtered, treat it
// as OK, and skip printing it.
return Status::OK();
}
RETURN_NOT_OK(s);
if (id_ == kIdMaster) {
// Prefix all master metrics with 'kudu_master_'.
static const string kMasterPrefix = "kudu_master_";
WriteMetricsPrometheus(writer, metrics, kMasterPrefix);
return Status::OK();
}
if (id_ == kIdTabletServer) {
// Prefix all tablet server metrics with 'kudu_tserver_'.
static const string kTabletServerPrefix = "kudu_tserver_";
WriteMetricsPrometheus(writer, metrics, kTabletServerPrefix);
return Status::OK();
}
return Status::NotSupported(
Substitute("$0: unexpected server-level metric entity", id_));
}
Status MetricEntity::CollectTo(MergedEntityMetrics* collections,
const MetricFilters& filters,
const MetricMergeRules& merge_rules) const {
MetricMap metrics;
AttributeMap attrs;
Status s = GetMetricsAndAttrs(filters, &metrics, &attrs);
if (s.IsNotFound()) {
// Status::NotFound is returned when this entity has been filtered, treat it
// as OK, and skip collecting it.
return Status::OK();
}
string entity_type = prototype_->name();
string entity_id = id();
auto* merge_rule = ::FindOrNull(merge_rules, prototype_->name());
if (merge_rule) {
entity_type = merge_rule->merge_to;
auto entity_id_ptr = ::FindOrNull(attrs, merge_rule->attribute_to_merge_by);
if (!entity_id_ptr) {
return Status::NotFound(Substitute("attribute $0 not found in entity $1",
merge_rule->attribute_to_merge_by, entity_id));
}
entity_id = *entity_id_ptr;
}
MergedEntity e(entity_type, entity_id);
auto& entity_collection = collections->emplace(std::make_pair(e, MergedMetrics())).first->second;
for (const auto& val : metrics) {
const MetricPrototype* prototype = val.first;
const scoped_refptr<Metric>& metric = val.second;
scoped_refptr<Metric> entry = FindPtrOrNull(entity_collection, prototype);
if (!entry) {
scoped_refptr<Metric> new_metric = metric->snapshot();
if (!new_metric->invalid_for_merge_) {
new_metric->UpdateModificationEpoch();
}
InsertOrDie(&entity_collection, new_metric->prototype(), new_metric);
} else {
entry->MergeFrom(metric);
}
}
return Status::OK();
}
void MetricEntity::RetireOldMetrics() {
MonoTime now(MonoTime::Now());
std::lock_guard<simple_spinlock> l(lock_);
for (auto it = metric_map_.begin(); it != metric_map_.end();) {
const scoped_refptr<Metric>& metric = it->second;
if (PREDICT_TRUE(!metric->HasOneRef() && published_)) {
// The metric is still in use. Note that, in the case of "NeverRetire()", the metric
// will have a ref-count of 2 because it is reffed by the 'never_retire_metrics_'
// collection.
// Ensure that it is not marked for later retirement (this could happen in the case
// that a metric is un-reffed and then re-reffed later by looking it up from the
// registry).
metric->retire_time_ = MonoTime();
++it;
continue;
}
if (!metric->retire_time_.Initialized()) {
VLOG(3) << "Metric " << it->first << " has become un-referenced or unpublished. "
<< "Will retire after the retention interval";
// This is the first time we've seen this metric as retirable.
metric->retire_time_ =
now + MonoDelta::FromMilliseconds(FLAGS_metrics_retirement_age_ms);
++it;
continue;
}
// If we've already seen this metric in a previous scan, check if it's
// time to retire it yet.
if (now < metric->retire_time_) {
VLOG(3) << "Metric " << it->first << " is un-referenced, but still within "
<< "the retention interval";
++it;
continue;
}
VLOG(2) << "Retiring metric " << it->first;
metric_map_.erase(it++);
}
}
void MetricEntity::NeverRetire(const scoped_refptr<Metric>& metric) {
std::lock_guard<simple_spinlock> l(lock_);
never_retire_metrics_.push_back(metric);
}
void MetricEntity::SetAttributes(const AttributeMap& attrs) {
std::lock_guard<simple_spinlock> l(lock_);
attributes_ = attrs;
}
void MetricEntity::SetAttribute(const string& key, const string& val) {
std::lock_guard<simple_spinlock> l(lock_);
attributes_[key] = val;
}
//
// MetricRegistry
//
MetricRegistry::MetricRegistry() {
}
MetricRegistry::~MetricRegistry() {
}
Status MetricRegistry::WriteAsJson(JsonWriter* writer, const MetricJsonOptions& opts) const {
EntityMap entities;
{
std::lock_guard<simple_spinlock> l(lock_);
entities = entities_;
}
writer->StartArray();
if (opts.merge_rules.empty()) {
for (const auto& e : entities) {
WARN_NOT_OK(e.second->WriteAsJson(writer, opts),
Substitute("Failed to write entity $0 as JSON", e.second->id()));
}
} else {
MergedEntityMetrics collections;
for (const auto& e : entities) {
WARN_NOT_OK(e.second->CollectTo(&collections, opts.filters, opts.merge_rules),
Substitute("Failed to collect entity $0", e.second->id()));
}
WriteToJson(writer, collections, opts);
}
writer->EndArray();
// Rather than having a thread poll metrics periodically to retire old ones,
// we'll just retire them here. The only downside is that, if no one is polling
// metrics, we may end up leaving them around indefinitely; however, metrics are
// small, and one might consider it a feature: if monitoring stops polling for
// metrics, we should keep them around until the next poll.
entities.clear(); // necessary to deref metrics we just dumped before doing retirement scan.
const_cast<MetricRegistry*>(this)->RetireOldMetrics();
return Status::OK();
}
Status MetricRegistry::WriteAsPrometheus(PrometheusWriter* writer) const {
EntityMap entities;
{
std::lock_guard<simple_spinlock> l(lock_);
entities = entities_;
}
for (const auto& e : entities) {
WARN_NOT_OK(e.second->WriteAsPrometheus(writer),
Substitute("Failed to write entity $0 as Prometheus", e.second->id()));
}
entities.clear(); // necessary to deref metrics we just dumped before doing retirement scan.
const_cast<MetricRegistry*>(this)->RetireOldMetrics();
return Status::OK();
}
void MetricRegistry::RetireOldMetrics() {
std::lock_guard<simple_spinlock> l(lock_);
for (auto it = entities_.begin(); it != entities_.end();) {
it->second->RetireOldMetrics();
if (it->second->num_metrics() == 0 &&
(it->second->HasOneRef() || !it->second->published())) {
// This entity has no metrics and either has no more external references or has
// been marked as unpublished, so we can remove it.
// Unlike retiring the metrics themselves, we don't wait for any timeout
// to retire them -- we assume that that timed retention has been satisfied
// by holding onto the metrics inside the entity.
entities_.erase(it++);
} else {
++it;
}
}
}
//
// MetricPrototypeRegistry
//
MetricPrototypeRegistry* MetricPrototypeRegistry::get() {
return Singleton<MetricPrototypeRegistry>::get();
}
void MetricPrototypeRegistry::AddMetric(const MetricPrototype* prototype) {
std::lock_guard<simple_spinlock> l(lock_);
metrics_.push_back(prototype);
}
void MetricPrototypeRegistry::AddEntity(const MetricEntityPrototype* prototype) {
std::lock_guard<simple_spinlock> l(lock_);
entities_.push_back(prototype);
}
void MetricPrototypeRegistry::WriteAsJson(JsonWriter* writer) const {
std::lock_guard<simple_spinlock> l(lock_);
MetricJsonOptions opts;
opts.include_schema_info = true;
writer->StartObject();
// Dump metric prototypes.
writer->String("metrics");
writer->StartArray();
for (const MetricPrototype* p : metrics_) {
writer->StartObject();
p->WriteFields(writer, opts);
writer->String("entity_type");
writer->String(p->entity_type());
writer->EndObject();
}
writer->EndArray();
// Dump entity prototypes.
writer->String("entities");
writer->StartArray();
for (const MetricEntityPrototype* p : entities_) {
writer->StartObject();
writer->String("name");
writer->String(p->name());
writer->EndObject();
}
writer->EndArray();
writer->EndObject();
}
void MetricPrototypeRegistry::WriteAsJson() const {
std::ostringstream s;
JsonWriter w(&s, JsonWriter::PRETTY);
WriteAsJson(&w);
std::cout << s.str() << std::endl;
}
void MetricPrototypeRegistry::WriteAsXML() const {
std::lock_guard<simple_spinlock> l(lock_);
std::cout << "<?xml version=\"1.0\"?>" << "\n";
// Add a root node for the document.
std::cout << "<AllMetrics>" << "\n";
// Dump metric prototypes.
for (const MetricPrototype* p : metrics_) {
std::cout << "<metric>";
std::cout << "<name>" << p->name() << "</name>";
std::cout << "<label>" << p->label() << "</label>";
std::cout << "<type>" << MetricType::Name(p->type()) << "</type>";
std::cout << "<unit>" << MetricUnit::Name(p->unit()) << "</unit>";
std::cout << "<description>" << p->description() << "</description>";
std::cout << "<level>" << MetricLevelName(p->level()) << "</level>";
std::cout << "<entity_type>" << p->entity_type() << "</entity_type>";
std::cout << "</metric>" << "\n";
}
// Dump entity prototypes.
for (const MetricEntityPrototype* e : entities_) {
std::cout << "<entity>";
std::cout << "<name>" << e->name() << "</name>";
std::cout << "</entity>" << "\n";
}
std::cout << "</AllMetrics>" << "\n";
}
//
// MetricPrototype
//
MetricPrototype::MetricPrototype(CtorArgs args) : args_(args) {
MetricPrototypeRegistry::get()->AddMetric(this);
}
void MetricPrototype::WriteFields(JsonWriter* writer,
const MetricJsonOptions& opts) const {
writer->String("name");
writer->String(name());
if (opts.include_schema_info) {
writer->String("label");
writer->String(label());
writer->String("type");
writer->String(MetricType::Name(type()));
writer->String("unit");
writer->String(MetricUnit::Name(unit()));
writer->String("description");
writer->String(description());
writer->String("level");
writer->String(MetricLevelName(level()));
}
}
void MetricPrototype::WriteHelpAndType(PrometheusWriter* writer,
const string& prefix) const {
writer->WriteEntry(Substitute("# HELP $0$1 $2\n# TYPE $3$4 $5\n",
prefix, name(), description(),
prefix, name(), MetricType::Name(type())));
}
//
// FunctionGaugeDetacher
//
FunctionGaugeDetacher::FunctionGaugeDetacher() {
}
FunctionGaugeDetacher::~FunctionGaugeDetacher() {
for (const auto& f : functions_) {
f();
}
}
scoped_refptr<MetricEntity> MetricRegistry::FindOrCreateEntity(
const MetricEntityPrototype* prototype,
const string& id,
const MetricEntity::AttributeMap& initial_attrs) {
std::lock_guard<simple_spinlock> l(lock_);
scoped_refptr<MetricEntity> e = FindPtrOrNull(entities_, id);
if (!e) {
e = new MetricEntity(prototype, id, initial_attrs);
InsertOrDie(&entities_, id, e);
} else if (!e->published()) {
e = new MetricEntity(prototype, id, initial_attrs);
entities_[id] = e;
} else {
e->SetAttributes(initial_attrs);
}
return e;
}
//
// Metric
//
std::atomic<int64_t> Metric::g_epoch_;
Metric::Metric(const MetricPrototype* prototype)
: prototype_(prototype),
m_epoch_(current_epoch()) {
}
Metric::~Metric() {
}
void Metric::IncrementEpoch() {
g_epoch_++;
}
void Metric::UpdateModificationEpochSlowPath() {
int64_t new_epoch, old_epoch;
// CAS loop to ensure that we never transition a metric's epoch backwards
// even if multiple threads race to update it.
do {
old_epoch = m_epoch_;
new_epoch = g_epoch_;
} while (old_epoch < new_epoch &&
!m_epoch_.compare_exchange_weak(old_epoch, new_epoch));
}
//
// Gauge
//
Status Gauge::WriteAsJson(JsonWriter* writer,
const MetricJsonOptions& opts) const {
writer->StartObject();
prototype_->WriteFields(writer, opts);
writer->String("value");
WriteValue(writer);
writer->EndObject();
return Status::OK();
}
Status Gauge::WriteAsPrometheus(PrometheusWriter* writer, const string& prefix) const {
prototype_->WriteHelpAndType(writer, prefix);
WriteValue(writer, prefix);
return Status::OK();
}
//
// StringGauge
//
StringGauge::StringGauge(const GaugePrototype<string>* proto,
string initial_value,
unordered_set<string> initial_unique_values)
: Gauge(proto),
value_(std::move(initial_value)),
unique_values_(std::move(initial_unique_values)) {}
scoped_refptr<Metric> StringGauge::snapshot() const {
std::lock_guard<simple_spinlock> l(lock_);
auto p = new StringGauge(down_cast<const GaugePrototype<string>*>(prototype_),
value_,
unique_values_);
p->m_epoch_.store(m_epoch_);
p->invalid_for_merge_ = invalid_for_merge_;
p->retire_time_ = retire_time_;
return scoped_refptr<Metric>(p);
}
string StringGauge::value() const {
std::lock_guard<simple_spinlock> l(lock_);
if (PREDICT_TRUE(unique_values_.empty())) {
return value_;
}
return JoinStrings(unique_values_, ", ");
}
void StringGauge::FillUniqueValuesUnlocked() {
if (unique_values_.empty()) {
unique_values_.insert(value_);
}
}
unordered_set<string> StringGauge::unique_values() {
std::lock_guard<simple_spinlock> l(lock_);
FillUniqueValuesUnlocked();
return unique_values_;
}
void StringGauge::set_value(const string& value) {
UpdateModificationEpoch();
std::lock_guard<simple_spinlock> l(lock_);
value_ = value;
unique_values_.clear();
}
void StringGauge::MergeFrom(const scoped_refptr<Metric>& other) {
if (PREDICT_FALSE(this == other.get())) {
return;
}
if (InvalidateIfNeededInMerge(other)) {
return;
}
UpdateModificationEpoch();
scoped_refptr<StringGauge> other_ptr = down_cast<StringGauge*>(other.get());
auto other_values = other_ptr->unique_values();
std::lock_guard<simple_spinlock> l(lock_);
FillUniqueValuesUnlocked();
unique_values_.insert(other_values.begin(), other_values.end());
}
void StringGauge::WriteValue(JsonWriter* writer) const {
writer->String(value());
}
// A string gauge's value can be anything, but Prometheus does not support
// non-numeric values for gauges with exception of {+,-}Inf and NaN
// (see https://prometheus.io/docs/instrumenting/exposition_formats/).
// DCHECK() is added to make sure this method is not called from anywhere,
// but overriding it is necessary since Gauge::WriteValue() is a pure virtual one.
// An alternative could be defining an empty implementation for Gauge::WriteValue()
// virtual method and not adding this empty override here.
void StringGauge::WriteValue(PrometheusWriter* /*writer*/,
const string& /*prefix*/) const {
DCHECK(false);
}
Status StringGauge::WriteAsPrometheus(PrometheusWriter* /*writer*/,
const string& /*prefix*/) const {
// Prometheus doesn't support string gauges.
// This function ensures that output written to Prometheus is empty.
return Status::OK();
}
//
// MeanGauge
//
scoped_refptr<Metric> MeanGauge::snapshot() const {
std::lock_guard<simple_spinlock> l(lock_);
auto p = new MeanGauge(down_cast<const GaugePrototype<double>*>(prototype_));
p->set_value(total_sum_, total_count_);
p->m_epoch_.store(m_epoch_);
p->invalid_for_merge_ = invalid_for_merge_;
p->retire_time_ = retire_time_;
return scoped_refptr<Metric>(p);
}
double MeanGauge::value() const {
std::lock_guard<simple_spinlock> l(lock_);
return total_count_ > 0 ? total_sum_ / total_count_
: 0.0;
}
double MeanGauge::total_sum() const {
std::lock_guard<simple_spinlock> l(lock_);
return total_sum_;
}
double MeanGauge::total_count() const {
std::lock_guard<simple_spinlock> l(lock_);
return total_count_;
}
void MeanGauge::set_value(double total_sum, double total_count) {
std::lock_guard<simple_spinlock> l(lock_);
total_sum_ = total_sum;
total_count_ = total_count;
}
void MeanGauge::MergeFrom(const scoped_refptr<Metric>& other) {
if (PREDICT_FALSE(this == other.get())) {
return;
}
if (InvalidateIfNeededInMerge(other)) {
return;
}
UpdateModificationEpoch();
scoped_refptr<MeanGauge> other_ptr = down_cast<MeanGauge*>(other.get());
std::lock_guard<simple_spinlock> l(lock_);
total_sum_ += other_ptr->total_sum();
total_count_ += other_ptr->total_count();
}
void MeanGauge::WriteValue(JsonWriter* writer) const {
writer->Double(value());
writer->String("total_sum");
writer->Double(total_sum());
writer->String("total_count");
writer->Double(total_count());
}
void MeanGauge::WriteValue(PrometheusWriter* writer, const string& prefix) const {
auto output = Substitute("$0$1{unit_type=\"$2\"} $3\n", prefix, prototype_->name(),
MetricUnit::Name(prototype_->unit()), value());
SubstituteAndAppend(&output, "$0$1$2{unit_type=\"$3\"} $4\n",
prefix, prototype_->name(), "_count",
MetricUnit::Name(prototype_->unit()), total_count());
SubstituteAndAppend(&output, "$0$1$2{unit_type=\"$3\"} $4\n",
prefix, prototype_->name(), "_sum",
MetricUnit::Name(prototype_->unit()), total_sum());
writer->WriteEntry(output);
}
//
// Counter
//
// This implementation is optimized by using a striped counter. See LongAdder for details.
scoped_refptr<Counter> CounterPrototype::Instantiate(const scoped_refptr<MetricEntity>& entity) {
return entity->FindOrCreateCounter(this);
}
Counter::Counter(const CounterPrototype* proto) : Metric(proto) {
}
int64_t Counter::value() const {
return value_.Value();
}
void Counter::Increment() {
IncrementBy(1);
}
void Counter::IncrementBy(int64_t amount) {
UpdateModificationEpoch();
value_.IncrementBy(amount);
}
Status Counter::WriteAsJson(JsonWriter* writer,
const MetricJsonOptions& opts) const {
writer->StartObject();
prototype_->WriteFields(writer, opts);
writer->String("value");
writer->Int64(value());
writer->EndObject();
return Status::OK();
}
Status Counter::WriteAsPrometheus(PrometheusWriter* writer, const string& prefix) const {
prototype_->WriteHelpAndType(writer, prefix);
writer->WriteEntry(Substitute("$0$1{unit_type=\"$2\"} $3\n", prefix, prototype_->name(),
MetricUnit::Name(prototype_->unit()), value()));
return Status::OK();
}
/////////////////////////////////////////////////
// HistogramPrototype
/////////////////////////////////////////////////
HistogramPrototype::HistogramPrototype(const MetricPrototype::CtorArgs& args,
uint64_t max_trackable_value, int num_sig_digits)
: MetricPrototype(args),
max_trackable_value_(max_trackable_value),
num_sig_digits_(num_sig_digits) {
// Better to crash at definition time than at instantiation time.
CHECK(HdrHistogram::IsValidHighestTrackableValue(max_trackable_value))
<< Substitute("Invalid max trackable value on histogram $0: $1",
args.name_, max_trackable_value);
CHECK(HdrHistogram::IsValidNumSignificantDigits(num_sig_digits))
<< Substitute("Invalid number of significant digits on histogram $0: $1",
args.name_, num_sig_digits);
}
scoped_refptr<Histogram> HistogramPrototype::Instantiate(
const scoped_refptr<MetricEntity>& entity) {
return entity->FindOrCreateHistogram(this);
}
/////////////////////////////////////////////////
// Histogram
/////////////////////////////////////////////////
Histogram::Histogram(const HistogramPrototype* proto)
: Metric(proto),
histogram_(new HdrHistogram(proto->max_trackable_value(), proto->num_sig_digits())) {
}
Histogram::Histogram(const HistogramPrototype* proto, const HdrHistogram& hdr_hist)
: Metric(proto),
histogram_(new HdrHistogram(hdr_hist)) {
}
void Histogram::Increment(int64_t value) {
UpdateModificationEpoch();
histogram_->Increment(value);
}
void Histogram::IncrementBy(int64_t value, int64_t amount) {
UpdateModificationEpoch();
histogram_->IncrementBy(value, amount);
}
Status Histogram::WriteAsJson(JsonWriter* writer,
const MetricJsonOptions& opts) const {
HistogramSnapshotPB snapshot;
RETURN_NOT_OK(GetHistogramSnapshotPB(&snapshot, opts));
writer->Protobuf(snapshot);
return Status::OK();
}
Status Histogram::WriteAsPrometheus(PrometheusWriter* writer, const string& prefix) const {
// Snapshot is taken to preserve the consistency across metrics and
// requirements given by Prometheus. The value for the _bucket in +Inf
// quantile needs to be equal to the total _count
HistogramSnapshotPB snapshot;
RETURN_NOT_OK(GetHistogramSnapshotPB(&snapshot, {}));
auto output = Substitute("$0$1$2{unit_type=\"$3\", le=\"0.75\"} $4\n",
prefix, prototype_->name(), "_bucket",
MetricUnit::Name(prototype_->unit()),
snapshot.percentile_75());
SubstituteAndAppend(&output, "$0$1$2{unit_type=\"$3\", le=\"0.95\"} $4\n",
prefix, prototype_->name(), "_bucket",
MetricUnit::Name(prototype_->unit()),
snapshot.percentile_95());
SubstituteAndAppend(&output, "$0$1$2{unit_type=\"$3\", le=\"0.99\"} $4\n",
prefix, prototype_->name(), "_bucket",
MetricUnit::Name(prototype_->unit()),
snapshot.percentile_99());
SubstituteAndAppend(&output, "$0$1$2{unit_type=\"$3\", le=\"0.999\"} $4\n",
prefix, prototype_->name(), "_bucket",
MetricUnit::Name(prototype_->unit()),
snapshot.percentile_99_9());
SubstituteAndAppend(&output, "$0$1$2{unit_type=\"$3\", le=\"0.9999\"} $4\n",
prefix, prototype_->name(), "_bucket",
MetricUnit::Name(prototype_->unit()),
snapshot.percentile_99_99());
SubstituteAndAppend(&output, "$0$1$2{unit_type=\"$3\", le=\"+Inf\"} $4\n",
prefix, prototype_->name(), "_bucket",
MetricUnit::Name(prototype_->unit()),
snapshot.total_count());
SubstituteAndAppend(&output, "$0$1$2{unit_type=\"$3\"} $4\n",
prefix, prototype_->name(), "_sum",
MetricUnit::Name(prototype_->unit()),
snapshot.total_sum());
SubstituteAndAppend(&output, "$0$1$2{unit_type=\"$3\"} $4\n",
prefix, prototype_->name(), "_count",
MetricUnit::Name(prototype_->unit()),
snapshot.total_count());
prototype_->WriteHelpAndType(writer, prefix);
writer->WriteEntry(output);
return Status::OK();
}
Status Histogram::GetHistogramSnapshotPB(HistogramSnapshotPB* snapshot_pb,
const MetricJsonOptions& opts) const {
snapshot_pb->set_name(prototype_->name());
if (opts.include_schema_info) {
snapshot_pb->set_type(MetricType::Name(prototype_->type()));
snapshot_pb->set_label(prototype_->label());
snapshot_pb->set_unit(MetricUnit::Name(prototype_->unit()));
snapshot_pb->set_description(prototype_->description());
snapshot_pb->set_max_trackable_value(histogram_->highest_trackable_value());
snapshot_pb->set_num_significant_digits(histogram_->num_significant_digits());
}
// Fast-path for a reasonably common case of an empty histogram. This occurs
// when a histogram is tracking some information about a feature not in
// use, for example.
if (histogram_->TotalCount() == 0) {
snapshot_pb->set_total_count(0);
snapshot_pb->set_total_sum(0);
snapshot_pb->set_min(0);
snapshot_pb->set_mean(0);
snapshot_pb->set_percentile_75(0);
snapshot_pb->set_percentile_95(0);
snapshot_pb->set_percentile_99(0);
snapshot_pb->set_percentile_99_9(0);
snapshot_pb->set_percentile_99_99(0);
snapshot_pb->set_max(0);
snapshot_pb->set_last(0);
} else {
HdrHistogram snapshot(*histogram_);
snapshot_pb->set_total_count(snapshot.TotalCount());
snapshot_pb->set_total_sum(snapshot.TotalSum());
snapshot_pb->set_min(snapshot.MinValue());
snapshot_pb->set_mean(snapshot.MeanValue());
snapshot_pb->set_percentile_75(snapshot.ValueAtPercentile(75));
snapshot_pb->set_percentile_95(snapshot.ValueAtPercentile(95));
snapshot_pb->set_percentile_99(snapshot.ValueAtPercentile(99));
snapshot_pb->set_percentile_99_9(snapshot.ValueAtPercentile(99.9));
snapshot_pb->set_percentile_99_99(snapshot.ValueAtPercentile(99.99));
snapshot_pb->set_max(snapshot.MaxValue());
snapshot_pb->set_last(snapshot.LastValue());
if (opts.include_raw_histograms) {
RecordedValuesIterator iter(&snapshot);
while (iter.HasNext()) {
HistogramIterationValue value;
RETURN_NOT_OK(iter.Next(&value));
snapshot_pb->add_values(value.value_iterated_to);
snapshot_pb->add_counts(value.count_at_value_iterated_to);
}
}
}
return Status::OK();
}
uint64_t Histogram::CountInBucketForValueForTests(uint64_t value) const {
return histogram_->CountInBucketForValue(value);
}
uint64_t Histogram::TotalCount() const {
return histogram_->TotalCount();
}
uint64_t Histogram::MinValueForTests() const {
return histogram_->MinValue();
}
uint64_t Histogram::MaxValueForTests() const {
return histogram_->MaxValue();
}
double Histogram::MeanValueForTests() const {
return histogram_->MeanValue();
}
ScopedLatencyMetric::ScopedLatencyMetric(Histogram* latency_hist)
: latency_hist_(latency_hist) {
if (latency_hist_) {
time_started_ = MonoTime::Now();
}
}
ScopedLatencyMetric::~ScopedLatencyMetric() {
if (latency_hist_ != nullptr) {
MonoTime time_now = MonoTime::Now();
latency_hist_->Increment((time_now - time_started_).ToMicroseconds());
}
}
} // namespace kudu