blob: 265374c08cbb3cbe86415c186bc82dfe6f21d532 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package prometheus
import (
"bytes"
"sync"
)
import (
prom "github.com/prometheus/client_golang/prometheus"
)
import (
"dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
)
type rtMetric struct {
nameSuffix string
helpPrefix string
valueFunc func(*aggregate.Result) float64
}
func (m *rtMetric) desc(opts *RtOpts, labels []string) *prom.Desc {
return prom.NewDesc(prom.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name+m.nameSuffix), m.helpPrefix+opts.Help, labels, opts.ConstLabels)
}
var (
rtMetrics = make([]*rtMetric, 5)
aggMetrics = make([]*rtMetric, 3)
)
func init() {
rtMetrics[0] = &rtMetric{nameSuffix: "_sum", helpPrefix: "Sum ", valueFunc: func(r *aggregate.Result) float64 { return r.Total }}
rtMetrics[1] = &rtMetric{nameSuffix: "_last", helpPrefix: "Last ", valueFunc: func(r *aggregate.Result) float64 { return r.Last }}
rtMetrics[2] = &rtMetric{nameSuffix: "_min", helpPrefix: "Min ", valueFunc: func(r *aggregate.Result) float64 { return r.Min }}
rtMetrics[3] = &rtMetric{nameSuffix: "_max", helpPrefix: "Max ", valueFunc: func(r *aggregate.Result) float64 { return r.Max }}
rtMetrics[4] = &rtMetric{nameSuffix: "_avg", helpPrefix: "Average ", valueFunc: func(r *aggregate.Result) float64 { return r.Avg }}
aggMetrics[0] = &rtMetric{nameSuffix: "_avg_milliseconds_aggregate", helpPrefix: "The average ", valueFunc: func(r *aggregate.Result) float64 { return r.Avg }}
aggMetrics[1] = &rtMetric{nameSuffix: "_min_milliseconds_aggregate", helpPrefix: "The minimum ", valueFunc: func(r *aggregate.Result) float64 { return r.Min }}
aggMetrics[2] = &rtMetric{nameSuffix: "_max_milliseconds_aggregate", helpPrefix: "The maximum ", valueFunc: func(r *aggregate.Result) float64 { return r.Max }}
}
type RtOpts struct {
Namespace string
Subsystem string
Name string
Help string
ConstLabels prom.Labels
bucketNum int // only for aggRt
timeWindowSeconds int64 // only for aggRt
}
type observer interface {
Observe(val float64)
result() *aggregate.Result
}
type aggResult struct {
agg *aggregate.TimeWindowAggregator
}
func (r *aggResult) Observe(val float64) {
r.agg.Add(val)
}
func (r *aggResult) result() *aggregate.Result {
return r.agg.Result()
}
type valueResult struct {
mtx sync.RWMutex
val *aggregate.Result
}
func (r *valueResult) Observe(val float64) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.val.Update(val)
}
func (r *valueResult) result() *aggregate.Result {
res := aggregate.NewResult()
r.mtx.RLock()
res.Merge(r.val)
r.mtx.RUnlock()
return res.Get()
}
type Rt struct {
tags map[string]string
obs observer
}
func (r *Rt) Observe(val float64) {
r.obs.Observe(val)
}
func buildKey(m map[string]string, labNames []string) string {
var buffer bytes.Buffer
for _, label := range labNames {
if buffer.Len() != 0 {
buffer.WriteString("_")
}
buffer.WriteString(m[label])
}
return buffer.String()
}
func buildLabelValues(m map[string]string, labNames []string) []string {
values := make([]string, len(labNames))
for i, label := range labNames {
values[i] = m[label]
}
return values
}
type RtVec struct {
opts *RtOpts
labelNames []string
aggMap sync.Map
initFunc func(tags map[string]string) *Rt
metrics []*rtMetric
}
func NewRtVec(opts *RtOpts, labNames []string) *RtVec {
return &RtVec{
opts: opts,
labelNames: labNames,
metrics: rtMetrics,
initFunc: func(tags map[string]string) *Rt {
return &Rt{
tags: tags,
obs: &valueResult{val: aggregate.NewResult()},
}
},
}
}
func NewAggRtVec(opts *RtOpts, labNames []string) *RtVec {
return &RtVec{
opts: opts,
labelNames: labNames,
metrics: aggMetrics,
initFunc: func(tags map[string]string) *Rt {
return &Rt{
tags: tags,
obs: &aggResult{agg: aggregate.NewTimeWindowAggregator(opts.bucketNum, opts.timeWindowSeconds)},
}
},
}
}
func (r *RtVec) With(tags map[string]string) prom.Observer {
k := buildKey(tags, r.labelNames)
return r.computeIfAbsent(k, func() *Rt {
return r.initFunc(tags)
})
}
func (r *RtVec) computeIfAbsent(k string, supplier func() *Rt) *Rt {
v, ok := r.aggMap.Load(k)
if !ok {
v, _ = r.aggMap.LoadOrStore(k, supplier())
}
return v.(*Rt)
}
func (r *RtVec) Collect(ch chan<- prom.Metric) {
r.aggMap.Range(func(_, val interface{}) bool {
v := val.(*Rt)
res := v.obs.result()
for _, m := range r.metrics {
ch <- prom.MustNewConstMetric(m.desc(r.opts, r.labelNames), prom.GaugeValue, m.valueFunc(res), buildLabelValues(v.tags, r.labelNames)...)
}
return true
})
}
func (r *RtVec) Describe(ch chan<- *prom.Desc) {
for _, m := range r.metrics {
ch <- m.desc(r.opts, r.labelNames)
}
}