| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package prometheus |
| |
| import ( |
| "strings" |
| "sync" |
| "sync/atomic" |
| "time" |
| ) |
| |
| import ( |
| "github.com/prometheus/client_golang/prometheus" |
| "github.com/prometheus/client_golang/prometheus/promauto" |
| ) |
| |
| import ( |
| "dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate" |
| ) |
| |
| func newHistogramVec(name, namespace string, labels []string) *prometheus.HistogramVec { |
| return prometheus.NewHistogramVec( |
| prometheus.HistogramOpts{ |
| Namespace: namespace, |
| Name: name, |
| Buckets: defaultHistogramBucket, |
| }, |
| labels) |
| } |
| |
| func newCounter(name, namespace string) prometheus.Counter { |
| return prometheus.NewCounter( |
| prometheus.CounterOpts{ |
| Namespace: namespace, |
| Name: name, |
| }) |
| } |
| |
| func newCounterVec(name, namespace string, labels []string) *prometheus.CounterVec { |
| return prometheus.NewCounterVec( |
| prometheus.CounterOpts{ |
| Name: name, |
| Namespace: namespace, |
| }, labels) |
| } |
| |
| func newGauge(name, namespace string) prometheus.Gauge { |
| return prometheus.NewGauge( |
| prometheus.GaugeOpts{ |
| Name: name, |
| Namespace: namespace, |
| }) |
| } |
| |
| func newGaugeVec(name, namespace string, labels []string) *prometheus.GaugeVec { |
| return prometheus.NewGaugeVec( |
| prometheus.GaugeOpts{ |
| Name: name, |
| Namespace: namespace, |
| }, labels) |
| } |
| |
| func newSummary(name, namespace string) prometheus.Summary { |
| return prometheus.NewSummary( |
| prometheus.SummaryOpts{ |
| Name: name, |
| Namespace: namespace, |
| }) |
| } |
| |
| // newSummaryVec create SummaryVec, the Namespace is dubbo |
| // the objectives is from my experience. |
| func newSummaryVec(name, namespace string, labels []string, maxAge int64) *prometheus.SummaryVec { |
| return prometheus.NewSummaryVec( |
| prometheus.SummaryOpts{ |
| Namespace: namespace, |
| Name: name, |
| Objectives: map[float64]float64{ |
| 0.5: 0.01, |
| 0.75: 0.01, |
| 0.90: 0.005, |
| 0.98: 0.002, |
| 0.99: 0.001, |
| 0.999: 0.0001, |
| }, |
| MaxAge: time.Duration(maxAge), |
| }, |
| labels, |
| ) |
| } |
| |
| // create an auto register histogram vec |
| func newAutoHistogramVec(name, namespace string, labels []string) *prometheus.HistogramVec { |
| return promauto.NewHistogramVec( |
| prometheus.HistogramOpts{ |
| Namespace: namespace, |
| Name: name, |
| Buckets: defaultHistogramBucket, |
| }, |
| labels) |
| } |
| |
| // create an auto register counter vec |
| func newAutoCounterVec(name, namespace string, labels []string) *prometheus.CounterVec { |
| return promauto.NewCounterVec( |
| prometheus.CounterOpts{ |
| Name: name, |
| Namespace: namespace, |
| }, labels) |
| } |
| |
| // create an auto register gauge vec |
| func newAutoGaugeVec(name, namespace string, labels []string) *prometheus.GaugeVec { |
| return promauto.NewGaugeVec( |
| prometheus.GaugeOpts{ |
| Name: name, |
| Namespace: namespace, |
| }, labels) |
| } |
| |
| // create an auto register summary vec |
| func newAutoSummaryVec(name, namespace string, labels []string, maxAge int64) *prometheus.SummaryVec { |
| return promauto.NewSummaryVec( |
| prometheus.SummaryOpts{ |
| Namespace: namespace, |
| Name: name, |
| Objectives: map[float64]float64{ |
| 0.5: 0.01, |
| 0.75: 0.01, |
| 0.90: 0.005, |
| 0.98: 0.002, |
| 0.99: 0.001, |
| 0.999: 0.0001, |
| }, |
| MaxAge: time.Duration(maxAge), |
| }, |
| labels, |
| ) |
| } |
| |
| type GaugeVecWithSyncMap struct { |
| GaugeVec *prometheus.GaugeVec |
| SyncMap *sync.Map // key: labels, value: *atomic.Value |
| } |
| |
| func newAutoGaugeVecWithSyncMap(name, namespace string, labels []string) *GaugeVecWithSyncMap { |
| return &GaugeVecWithSyncMap{ |
| GaugeVec: newAutoGaugeVec(name, namespace, labels), |
| SyncMap: &sync.Map{}, |
| } |
| } |
| |
| func convertLabelsToMapKey(labels prometheus.Labels) string { |
| return strings.Join([]string{ |
| labels[applicationNameKey], |
| labels[groupKey], |
| labels[hostnameKey], |
| labels[interfaceKey], |
| labels[ipKey], |
| labels[versionKey], |
| labels[methodKey], |
| }, "_") |
| } |
| |
| func (gv *GaugeVecWithSyncMap) updateMin(labels *prometheus.Labels, curValue int64) { |
| key := convertLabelsToMapKey(*labels) |
| cur := &atomic.Value{} // for first store |
| cur.Store(curValue) |
| for { |
| if actual, loaded := gv.SyncMap.LoadOrStore(key, cur); loaded { |
| store := actual.(*atomic.Value) |
| storeValue := store.Load().(int64) |
| if curValue < storeValue { |
| if store.CompareAndSwap(storeValue, curValue) { |
| // value is not changed, should update |
| gv.GaugeVec.With(*labels).Set(float64(curValue)) |
| break |
| } |
| // value has changed, continue for loop |
| } else { |
| // no need to update |
| break |
| } |
| } else { |
| // store current curValue as this labels' init value |
| gv.GaugeVec.With(*labels).Set(float64(curValue)) |
| break |
| } |
| } |
| } |
| |
| func (gv *GaugeVecWithSyncMap) updateMax(labels *prometheus.Labels, curValue int64) { |
| key := convertLabelsToMapKey(*labels) |
| cur := &atomic.Value{} // for first store |
| cur.Store(curValue) |
| for { |
| if actual, loaded := gv.SyncMap.LoadOrStore(key, cur); loaded { |
| store := actual.(*atomic.Value) |
| storeValue := store.Load().(int64) |
| if curValue > storeValue { |
| if store.CompareAndSwap(storeValue, curValue) { |
| // value is not changed, should update |
| gv.GaugeVec.With(*labels).Set(float64(curValue)) |
| break |
| } |
| // value has changed, continue for loop |
| } else { |
| // no need to update |
| break |
| } |
| } else { |
| // store current curValue as this labels' init value |
| gv.GaugeVec.With(*labels).Set(float64(curValue)) |
| break |
| } |
| } |
| } |
| |
| func (gv *GaugeVecWithSyncMap) updateAvg(labels *prometheus.Labels, curValue int64) { |
| key := convertLabelsToMapKey(*labels) |
| cur := &atomic.Value{} // for first store |
| type avgPair struct { |
| Sum int64 |
| N int64 |
| } |
| cur.Store(avgPair{Sum: curValue, N: 1}) |
| |
| for { |
| if actual, loaded := gv.SyncMap.LoadOrStore(key, cur); loaded { |
| store := actual.(*atomic.Value) |
| storeValue := store.Load().(avgPair) |
| newValue := avgPair{Sum: storeValue.Sum + curValue, N: storeValue.N + 1} |
| if store.CompareAndSwap(storeValue, newValue) { |
| // value is not changed, should update |
| gv.GaugeVec.With(*labels).Set(float64(newValue.Sum / newValue.N)) |
| break |
| } |
| } else { |
| // store current curValue as this labels' init value |
| gv.GaugeVec.With(*labels).Set(float64(curValue)) |
| break |
| } |
| } |
| } |
| |
| type quantileGaugeVec struct { |
| gaugeVecSlice []*prometheus.GaugeVec |
| quantiles []float64 |
| syncMap *sync.Map // key: labels string, value: TimeWindowQuantile |
| } |
| |
| // Notice: names and quantiles should be the same length and same order. |
| func newQuantileGaugeVec(names []string, namespace string, labels []string, quantiles []float64) *quantileGaugeVec { |
| gvs := make([]*prometheus.GaugeVec, len(names)) |
| for i, name := range names { |
| gvs[i] = newAutoGaugeVec(name, namespace, labels) |
| } |
| gv := &quantileGaugeVec{ |
| gaugeVecSlice: gvs, |
| quantiles: quantiles, |
| syncMap: &sync.Map{}, |
| } |
| return gv |
| } |
| |
| func (gv *quantileGaugeVec) updateQuantile(labels *prometheus.Labels, curValue int64) { |
| key := convertLabelsToMapKey(*labels) |
| cur := aggregate.NewTimeWindowQuantile(100, 10, 120) |
| cur.Add(float64(curValue)) |
| |
| updateFunc := func(td *aggregate.TimeWindowQuantile) { |
| qs := td.Quantiles(gv.quantiles) |
| for i, q := range qs { |
| gv.gaugeVecSlice[i].With(*labels).Set(q) |
| } |
| } |
| |
| if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded { |
| store := actual.(*aggregate.TimeWindowQuantile) |
| store.Add(float64(curValue)) |
| updateFunc(store) |
| } else { |
| updateFunc(cur) |
| } |
| } |
| |
| type qpsGaugeVec struct { |
| gaugeVec *prometheus.GaugeVec |
| syncMap *sync.Map // key: labels string, value: TimeWindowCounter |
| } |
| |
| func newQpsGaugeVec(name, namespace string, labels []string) *qpsGaugeVec { |
| return &qpsGaugeVec{ |
| gaugeVec: newAutoGaugeVec(name, namespace, labels), |
| syncMap: &sync.Map{}, |
| } |
| } |
| |
| func (gv *qpsGaugeVec) updateQps(labels *prometheus.Labels) { |
| key := convertLabelsToMapKey(*labels) |
| cur := aggregate.NewTimeWindowCounter(10, 120) |
| cur.Inc() |
| |
| if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded { |
| store := actual.(*aggregate.TimeWindowCounter) |
| store.Inc() |
| gv.gaugeVec.With(*labels).Set(store.Count() / float64(store.LivedSeconds())) |
| } else { |
| gv.gaugeVec.With(*labels).Set(cur.Count() / float64(cur.LivedSeconds())) |
| } |
| } |
| |
| type aggregateCounterGaugeVec struct { |
| gaugeVec *prometheus.GaugeVec |
| syncMap *sync.Map // key: labels string, value: TimeWindowCounter |
| } |
| |
| func newAggregateCounterGaugeVec(name, namespace string, labels []string) *aggregateCounterGaugeVec { |
| return &aggregateCounterGaugeVec{ |
| gaugeVec: newAutoGaugeVec(name, namespace, labels), |
| syncMap: &sync.Map{}, |
| } |
| } |
| |
| func (gv *aggregateCounterGaugeVec) inc(labels *prometheus.Labels) { |
| key := convertLabelsToMapKey(*labels) |
| cur := aggregate.NewTimeWindowCounter(10, 120) |
| cur.Inc() |
| |
| if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded { |
| store := actual.(*aggregate.TimeWindowCounter) |
| store.Inc() |
| gv.gaugeVec.With(*labels).Set(store.Count()) |
| } else { |
| gv.gaugeVec.With(*labels).Set(cur.Count()) |
| } |
| } |
| |
| type aggregateFunctionsGaugeVec struct { |
| min *prometheus.GaugeVec |
| max *prometheus.GaugeVec |
| avg *prometheus.GaugeVec |
| syncMap *sync.Map // key: labels string, value: TimeWindowAggregator |
| } |
| |
| func newAggregateFunctionsGaugeVec(minName, maxName, avgName, namespace string, labels []string) *aggregateFunctionsGaugeVec { |
| return &aggregateFunctionsGaugeVec{ |
| min: newAutoGaugeVec(minName, namespace, labels), |
| max: newAutoGaugeVec(maxName, namespace, labels), |
| avg: newAutoGaugeVec(avgName, namespace, labels), |
| syncMap: &sync.Map{}, |
| } |
| } |
| |
| func (gv *aggregateFunctionsGaugeVec) update(labels *prometheus.Labels, curValue int64) { |
| key := convertLabelsToMapKey(*labels) |
| cur := aggregate.NewTimeWindowAggregator(10, 120) |
| cur.Add(float64(curValue)) |
| |
| updateFunc := func(aggregator *aggregate.TimeWindowAggregator) { |
| result := aggregator.Result() |
| gv.min.With(*labels).Set(result.Min) |
| gv.max.With(*labels).Set(result.Max) |
| gv.avg.With(*labels).Set(result.Avg) |
| } |
| |
| if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded { |
| store := actual.(*aggregate.TimeWindowAggregator) |
| store.Add(float64(curValue)) |
| updateFunc(store) |
| } else { |
| updateFunc(cur) |
| } |
| } |