| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package metrics |
| |
| import ( |
| "encoding/json" |
| "sync" |
| ) |
| |
| import ( |
| "github.com/dubbogo/gost/log/logger" |
| ) |
| |
| import ( |
| "dubbo.apache.org/dubbo-go/v3/common" |
| "dubbo.apache.org/dubbo-go/v3/common/constant" |
| "dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate" |
| ) |
| |
| const ( |
| DefaultCompression = 100 |
| DefaultBucketNum = 10 |
| DefaultTimeWindowSeconds = 120 |
| ) |
| |
| var ( |
| registries = make(map[string]func(*common.URL) MetricRegistry) |
| collectors = make([]CollectorFunc, 0) |
| registry MetricRegistry |
| once sync.Once |
| ) |
| |
| // CollectorFunc used to extend more indicators |
| type CollectorFunc func(MetricRegistry, *common.URL) |
| |
| // Init Metrics module |
| func Init(url *common.URL) { |
| once.Do(func() { |
| InitAppInfo(url.GetParam(constant.ApplicationKey, ""), url.GetParam(constant.AppVersionKey, "")) |
| // default protocol is already set in metricConfig |
| regFunc, ok := registries[url.Protocol] |
| if ok { |
| registry = regFunc(url) |
| for _, co := range collectors { |
| co(registry, url) |
| } |
| registry.Export() |
| } |
| }) |
| } |
| |
| // SetRegistry extend more MetricRegistry, default PrometheusRegistry |
| func SetRegistry(name string, v func(*common.URL) MetricRegistry) { |
| registries[name] = v |
| } |
| |
| // AddCollector add more indicators, like metadata, sla, config-center etc. |
| func AddCollector(name string, fun CollectorFunc) { |
| collectors = append(collectors, fun) |
| } |
| |
| // MetricRegistry data container,data compute、expose、agg |
| type MetricRegistry interface { |
| Counter(*MetricId) CounterMetric // add or update a counter |
| Gauge(*MetricId) GaugeMetric // add or update a gauge |
| Histogram(*MetricId) ObservableMetric // add a metric num to a histogram |
| Summary(*MetricId) ObservableMetric // add a metric num to a summary |
| Rt(*MetricId, *RtOpts) ObservableMetric // add a metric num to a rt |
| Export() // expose metric data, such as Prometheus http exporter |
| // GetMetrics() []*MetricSample // get all metric data |
| // GetMetricsString() (string, error) // get text format metric data |
| } |
| |
| type RtOpts struct { |
| Aggregate bool |
| BucketNum int // only for aggRt |
| TimeWindowSeconds int64 // only for aggRt |
| } |
| |
| // multi registry,like micrometer CompositeMeterRegistry |
| // type CompositeRegistry struct { |
| // rs []MetricRegistry |
| // } |
| |
| // Type metric type, same with micrometer |
| type Type uint8 // TODO check if Type is is useful |
| |
| const ( |
| Counter Type = iota |
| Gauge |
| LongTaskTimer |
| Timer |
| DistributionSummary |
| Other |
| ) |
| |
| // MetricId |
| // # HELP dubbo_metadata_store_provider_succeed_total Succeed Store Provider Metadata |
| // # TYPE dubbo_metadata_store_provider_succeed_total gauge |
| // dubbo_metadata_store_provider_succeed_total{application_name="provider",hostname="localhost",interface="org.example.DemoService",ip="10.252.156.213",} 1.0 |
| // other properties except value |
| type MetricId struct { |
| Name string |
| Desc string |
| Tags map[string]string // also named label |
| Type Type // TODO check if this field is useful |
| } |
| |
| func (m *MetricId) TagKeys() []string { |
| keys := make([]string, 0, len(m.Tags)) |
| for k := range m.Tags { |
| keys = append(keys, k) |
| } |
| return keys |
| } |
| |
| func NewMetricId(key *MetricKey, level MetricLevel) *MetricId { |
| return &MetricId{Name: key.Name, Desc: key.Desc, Tags: level.Tags()} |
| } |
| |
| // NewMetricIdByLabels create a MetricId by key and labels |
| func NewMetricIdByLabels(key *MetricKey, labels map[string]string) *MetricId { |
| return &MetricId{Name: key.Name, Desc: key.Desc, Tags: labels} |
| } |
| |
| // MetricSample a metric sample,This is the final data presentation, |
| // not an intermediate result(like summary,histogram they will export to a set of MetricSample) |
| type MetricSample struct { |
| *MetricId |
| value float64 |
| } |
| |
| // CounterMetric counter metric |
| type CounterMetric interface { |
| Inc() |
| Add(float64) |
| } |
| |
| // GaugeMetric gauge metric |
| type GaugeMetric interface { |
| Set(float64) |
| Inc() |
| Dec() |
| Add(float64) |
| Sub(float64) |
| } |
| |
| // histogram summary rt metric |
| type ObservableMetric interface { |
| Observe(float64) |
| } |
| |
| type BaseCollector struct { |
| R MetricRegistry |
| } |
| |
| func (c *BaseCollector) StateCount(total, succ, fail *MetricKey, level MetricLevel, succed bool) { |
| c.R.Counter(NewMetricId(total, level)).Inc() |
| if succed { |
| c.R.Counter(NewMetricId(succ, level)).Inc() |
| } else { |
| c.R.Counter(NewMetricId(fail, level)).Inc() |
| } |
| } |
| |
| // CounterVec means a set of counters with the same metricKey but different labels |
| type CounterVec interface { |
| Inc(labels map[string]string) |
| Add(labels map[string]string, v float64) |
| } |
| |
| // NewCounterVec create a CounterVec default implementation. |
| func NewCounterVec(metricRegistry MetricRegistry, metricKey *MetricKey) CounterVec { |
| return &DefaultCounterVec{ |
| metricRegistry: metricRegistry, |
| metricKey: metricKey, |
| } |
| } |
| |
| // DefaultCounterVec is a default CounterVec implementation. |
| type DefaultCounterVec struct { |
| metricRegistry MetricRegistry |
| metricKey *MetricKey |
| } |
| |
| func (d *DefaultCounterVec) Inc(labels map[string]string) { |
| d.metricRegistry.Counter(NewMetricIdByLabels(d.metricKey, labels)).Inc() |
| } |
| |
| func (d *DefaultCounterVec) Add(labels map[string]string, v float64) { |
| d.metricRegistry.Counter(NewMetricIdByLabels(d.metricKey, labels)).Add(v) |
| } |
| |
| // GaugeVec means a set of gauges with the same metricKey but different labels |
| type GaugeVec interface { |
| Set(labels map[string]string, v float64) |
| Inc(labels map[string]string) |
| Dec(labels map[string]string) |
| Add(labels map[string]string, v float64) |
| Sub(labels map[string]string, v float64) |
| } |
| |
| // NewGaugeVec create a GaugeVec default implementation. |
| func NewGaugeVec(metricRegistry MetricRegistry, metricKey *MetricKey) GaugeVec { |
| return &DefaultGaugeVec{ |
| metricRegistry: metricRegistry, |
| metricKey: metricKey, |
| } |
| } |
| |
| // DefaultGaugeVec is a default GaugeVec implementation. |
| type DefaultGaugeVec struct { |
| metricRegistry MetricRegistry |
| metricKey *MetricKey |
| } |
| |
| func (d *DefaultGaugeVec) Set(labels map[string]string, v float64) { |
| d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKey, labels)).Set(v) |
| } |
| |
| func (d *DefaultGaugeVec) Inc(labels map[string]string) { |
| d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKey, labels)).Inc() |
| } |
| |
| func (d *DefaultGaugeVec) Dec(labels map[string]string) { |
| d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKey, labels)).Dec() |
| } |
| |
| func (d *DefaultGaugeVec) Add(labels map[string]string, v float64) { |
| d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKey, labels)).Add(v) |
| } |
| |
| func (d *DefaultGaugeVec) Sub(labels map[string]string, v float64) { |
| d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKey, labels)).Sub(v) |
| } |
| |
| // RtVec means a set of rt metrics with the same metricKey but different labels |
| type RtVec interface { |
| Record(labels map[string]string, v float64) |
| } |
| |
| // NewRtVec create a RtVec default implementation DefaultRtVec. |
| func NewRtVec(metricRegistry MetricRegistry, metricKey *MetricKey, rtOpts *RtOpts) RtVec { |
| return &DefaultRtVec{ |
| metricRegistry: metricRegistry, |
| metricKey: metricKey, |
| rtOpts: rtOpts, |
| } |
| } |
| |
| // DefaultRtVec is a default RtVec implementation. |
| // |
| // If rtOpts.Aggregate is true, it will use the aggregate.TimeWindowAggregator with local aggregation, |
| // else it will use the aggregate.Result without aggregation. |
| type DefaultRtVec struct { |
| metricRegistry MetricRegistry |
| metricKey *MetricKey |
| rtOpts *RtOpts |
| } |
| |
| func (d *DefaultRtVec) Record(labels map[string]string, v float64) { |
| d.metricRegistry.Rt(NewMetricIdByLabels(d.metricKey, labels), d.rtOpts).Observe(v) |
| } |
| |
| // labelsToString convert @labels to json format string for cache key |
| func labelsToString(labels map[string]string) string { |
| labelsJson, err := json.Marshal(labels) |
| if err != nil { |
| logger.Errorf("json.Marshal(labels) = error:%v", err) |
| return "" |
| } |
| return string(labelsJson) |
| } |
| |
| // QpsMetricVec means a set of qps metrics with the same metricKey but different labels. |
| type QpsMetricVec interface { |
| Record(labels map[string]string) |
| } |
| |
| func NewQpsMetricVec(metricRegistry MetricRegistry, metricKey *MetricKey) QpsMetricVec { |
| return &DefaultQpsMetricVec{ |
| metricRegistry: metricRegistry, |
| metricKey: metricKey, |
| mux: sync.RWMutex{}, |
| cache: make(map[string]*aggregate.TimeWindowCounter), |
| } |
| } |
| |
| // DefaultQpsMetricVec is a default QpsMetricVec implementation. |
| // |
| // It is concurrent safe, and it uses the aggregate.TimeWindowCounter to store and calculate the qps metrics. |
| type DefaultQpsMetricVec struct { |
| metricRegistry MetricRegistry |
| metricKey *MetricKey |
| mux sync.RWMutex |
| cache map[string]*aggregate.TimeWindowCounter // key: metrics labels, value: TimeWindowCounter |
| } |
| |
| func (d *DefaultQpsMetricVec) Record(labels map[string]string) { |
| key := labelsToString(labels) |
| if key == "" { |
| return |
| } |
| d.mux.RLock() |
| twc, ok := d.cache[key] |
| d.mux.RUnlock() |
| if !ok { |
| d.mux.Lock() |
| twc, ok = d.cache[key] |
| if !ok { |
| twc = aggregate.NewTimeWindowCounter(DefaultBucketNum, DefaultTimeWindowSeconds) |
| d.cache[key] = twc |
| } |
| d.mux.Unlock() |
| } |
| twc.Inc() |
| d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKey, labels)).Set(twc.Count() / float64(twc.LivedSeconds())) |
| } |
| |
| // AggregateCounterVec means a set of aggregate counter metrics with the same metricKey but different labels. |
| type AggregateCounterVec interface { |
| Inc(labels map[string]string) |
| } |
| |
| func NewAggregateCounterVec(metricRegistry MetricRegistry, metricKey *MetricKey) AggregateCounterVec { |
| return &DefaultAggregateCounterVec{ |
| metricRegistry: metricRegistry, |
| metricKey: metricKey, |
| mux: sync.RWMutex{}, |
| cache: make(map[string]*aggregate.TimeWindowCounter), |
| } |
| } |
| |
| // DefaultAggregateCounterVec is a default AggregateCounterVec implementation. |
| // |
| // It is concurrent safe, and it uses the aggregate.TimeWindowCounter to store and calculate the aggregate counter metrics. |
| type DefaultAggregateCounterVec struct { |
| metricRegistry MetricRegistry |
| metricKey *MetricKey |
| mux sync.RWMutex |
| cache map[string]*aggregate.TimeWindowCounter // key: metrics labels, value: TimeWindowCounter |
| } |
| |
| func (d *DefaultAggregateCounterVec) Inc(labels map[string]string) { |
| key := labelsToString(labels) |
| if key == "" { |
| return |
| } |
| d.mux.RLock() |
| twc, ok := d.cache[key] |
| d.mux.RUnlock() |
| if !ok { |
| d.mux.Lock() |
| twc, ok = d.cache[key] |
| if !ok { |
| twc = aggregate.NewTimeWindowCounter(DefaultBucketNum, DefaultTimeWindowSeconds) |
| d.cache[key] = twc |
| } |
| d.mux.Unlock() |
| } |
| twc.Inc() |
| d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKey, labels)).Set(twc.Count()) |
| } |
| |
| // QuantileMetricVec means a set of quantile metrics with the same metricKey but different labels. |
| type QuantileMetricVec interface { |
| Record(labels map[string]string, v float64) |
| } |
| |
| func NewQuantileMetricVec(metricRegistry MetricRegistry, metricKeys []*MetricKey, quantiles []float64) QuantileMetricVec { |
| return &DefaultQuantileMetricVec{ |
| metricRegistry: metricRegistry, |
| metricKeys: metricKeys, |
| mux: sync.RWMutex{}, |
| cache: make(map[string]*aggregate.TimeWindowQuantile), |
| quantiles: quantiles, |
| } |
| } |
| |
| // DefaultQuantileMetricVec is a default QuantileMetricVec implementation. |
| // |
| // It is concurrent safe, and it uses the aggregate.TimeWindowQuantile to store and calculate the quantile metrics. |
| type DefaultQuantileMetricVec struct { |
| metricRegistry MetricRegistry |
| metricKeys []*MetricKey |
| mux sync.RWMutex |
| cache map[string]*aggregate.TimeWindowQuantile // key: metrics labels, value: TimeWindowQuantile |
| quantiles []float64 |
| } |
| |
| func (d *DefaultQuantileMetricVec) Record(labels map[string]string, v float64) { |
| key := labelsToString(labels) |
| if key == "" { |
| return |
| } |
| d.mux.RLock() |
| twq, ok := d.cache[key] |
| d.mux.RUnlock() |
| if !ok { |
| d.mux.Lock() |
| twq, ok = d.cache[key] |
| if !ok { |
| twq = aggregate.NewTimeWindowQuantile(DefaultCompression, DefaultBucketNum, DefaultTimeWindowSeconds) |
| d.cache[key] = twq |
| } |
| d.mux.Unlock() |
| } |
| twq.Add(v) |
| |
| for i, q := range twq.Quantiles(d.quantiles) { |
| d.metricRegistry.Gauge(NewMetricIdByLabels(d.metricKeys[i], labels)).Set(q) |
| } |
| } |