blob: d274dd177a2941a83a1b097318e15f522f276a0c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package metrics
import (
"sync"
)
import (
"dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
)
var (
registries = make(map[string]func(*ReporterConfig) MetricRegistry)
collectors = make([]CollectorFunc, 0)
registry MetricRegistry
)
// CollectorFunc used to extend more indicators
type CollectorFunc func(MetricRegistry, *ReporterConfig)
// Init Metrics module
func Init(config *ReporterConfig) {
if config.Enable {
// defalut protocol is already set in metricConfig
regFunc, ok := registries[config.Protocol]
if ok {
registry = regFunc(config)
for _, co := range collectors {
co(registry, config)
}
registry.Export()
}
}
}
// SetRegistry extend more MetricRegistry, default PrometheusRegistry
func SetRegistry(name string, v func(*ReporterConfig) MetricRegistry) {
registries[name] = v
}
// AddCollector add more indicators, like metadata、sla、configcenter etc
func AddCollector(name string, fun func(MetricRegistry, *ReporterConfig)) {
collectors = append(collectors, fun)
}
// MetricRegistry data container,data compute、expose、agg
type MetricRegistry interface {
Counter(*MetricId) CounterMetric // add or update a counter
Gauge(*MetricId) GaugeMetric // add or update a gauge
Histogram(*MetricId) ObservableMetric // add a metric num to a histogram
Summary(*MetricId) ObservableMetric // add a metric num to a summary
Rt(*MetricId, *RtOpts) ObservableMetric // add a metric num to a rt
Export() // expose metric data, such as Prometheus http exporter
// GetMetrics() []*MetricSample // get all metric data
// GetMetricsString() (string, error) // get text format metric data
}
type RtOpts struct {
Aggregate bool
BucketNum int // only for aggRt
TimeWindowSeconds int64 // only for aggRt
}
// multi registry,like micrometer CompositeMeterRegistry
// type CompositeRegistry struct {
// rs []MetricRegistry
// }
// Type metric type, save with micrometer
type Type uint8
const (
Counter Type = iota
Gauge
LongTaskTimer
Timer
DistributionSummary
Other
)
// MetricId
// # HELP dubbo_metadata_store_provider_succeed_total Succeed Store Provider Metadata
// # TYPE dubbo_metadata_store_provider_succeed_total gauge
// dubbo_metadata_store_provider_succeed_total{application_name="provider",hostname="localhost",interface="org.example.DemoService",ip="10.252.156.213",} 1.0
// other properties except value
type MetricId struct {
Name string
Desc string
Tags map[string]string
Type Type
}
func (m *MetricId) TagKeys() []string {
keys := make([]string, 0, len(m.Tags))
for k := range m.Tags {
keys = append(keys, k)
}
return keys
}
func NewMetricId(key *MetricKey, level MetricLevel) *MetricId {
return &MetricId{Name: key.Name, Desc: key.Desc, Tags: level.Tags()}
}
// MetricSample a metric sample,This is the final data presentation,
// not an intermediate result(like summary,histogram they will export to a set of MetricSample)
type MetricSample struct {
*MetricId
value float64
}
// CounterMetric counter metric
type CounterMetric interface {
Inc()
Add(float64)
}
// GaugeMetric gauge metric
type GaugeMetric interface {
Set(float64)
// Inc()
// Dec()
// Add(float64)
// Sub(float64)
}
// histogram summary rt metric
type ObservableMetric interface {
Observe(float64)
}
// StatesMetrics multi metrics,include total,success num, fail num,call MetricsRegistry save data
type StatesMetrics interface {
Success()
AddSuccess(float64)
Fail()
AddFailed(float64)
Inc(succ bool)
}
func NewStatesMetrics(total *MetricId, succ *MetricId, fail *MetricId, reg MetricRegistry) StatesMetrics {
return &DefaultStatesMetric{total: total, succ: succ, fail: fail, r: reg}
}
type DefaultStatesMetric struct {
r MetricRegistry
total, succ, fail *MetricId
}
func (c DefaultStatesMetric) Inc(succ bool) {
if succ {
c.Success()
} else {
c.Fail()
}
}
func (c DefaultStatesMetric) Success() {
c.r.Counter(c.total).Inc()
c.r.Counter(c.succ).Inc()
}
func (c DefaultStatesMetric) AddSuccess(v float64) {
c.r.Counter(c.total).Add(v)
c.r.Counter(c.succ).Add(v)
}
func (c DefaultStatesMetric) Fail() {
c.r.Counter(c.total).Inc()
c.r.Counter(c.fail).Inc()
}
func (c DefaultStatesMetric) AddFailed(v float64) {
c.r.Counter(c.total).Add(v)
c.r.Counter(c.fail).Add(v)
}
// TimeMetric muliti metrics, include min(Gauge)、max(Gauge)、avg(Gauge)、sum(Gauge)、last(Gauge),call MetricRegistry to expose
// see dubbo-java org.apache.dubbo.metrics.aggregate.TimeWindowAggregator
type TimeMetric interface {
Record(float64)
}
const (
defaultBucketNum = 10
defalutTimeWindowSeconds = 120
)
// NewTimeMetric init and write all data to registry
func NewTimeMetric(min, max, avg, sum, last *MetricId, mr MetricRegistry) TimeMetric {
return &DefaultTimeMetric{r: mr, min: min, max: max, avg: avg, sum: sum, last: last,
agg: aggregate.NewTimeWindowAggregator(defaultBucketNum, defalutTimeWindowSeconds)}
}
type DefaultTimeMetric struct {
r MetricRegistry
agg *aggregate.TimeWindowAggregator
min, max, avg, sum, last *MetricId
}
func (m *DefaultTimeMetric) Record(v float64) {
m.agg.Add(v)
result := m.agg.Result()
m.r.Gauge(m.max).Set(result.Max)
m.r.Gauge(m.min).Set(result.Min)
m.r.Gauge(m.avg).Set(result.Avg)
m.r.Gauge(m.sum).Set(result.Total)
m.r.Gauge(m.last).Set(v)
}
// cache if needed, TimeMetrics must cached
var metricsCache map[string]interface{} = make(map[string]interface{})
var metricsCacheMutex sync.RWMutex
func ComputeIfAbsentCache(key string, supplier func() interface{}) interface{} {
metricsCacheMutex.RLock()
v, ok := metricsCache[key]
metricsCacheMutex.RUnlock()
if ok {
return v
} else {
metricsCacheMutex.Lock()
defer metricsCacheMutex.Unlock()
v, ok = metricsCache[key] // double check,avoid overwriting
if ok {
return v
} else {
n := supplier()
metricsCache[key] = n
return n
}
}
}