refactor: make rt metric more performance and easy to use (#2376)
diff --git a/common/host_util.go b/common/host_util.go
index 5c411ab..eac0ac1 100644
--- a/common/host_util.go
+++ b/common/host_util.go
@@ -31,8 +31,10 @@
"dubbo.apache.org/dubbo-go/v3/common/constant"
)
-var localIp string
-var localHostname string
+var (
+ localIp string
+ localHostname string
+)
func GetLocalIp() string {
if len(localIp) != 0 {
diff --git a/common/host_util_test.go b/common/host_util_test.go
index b78c4cf..4febece 100644
--- a/common/host_util_test.go
+++ b/common/host_util_test.go
@@ -34,6 +34,10 @@
assert.NotNil(t, GetLocalIp())
}
+func TestGetLocalHostName(t *testing.T) {
+ assert.NotNil(t, GetLocalHostName())
+}
+
func TestHandleRegisterIPAndPort(t *testing.T) {
url := NewURLWithOptions(WithIp("1.2.3.4"), WithPort("20000"))
HandleRegisterIPAndPort(url)
diff --git a/metrics/api.go b/metrics/api.go
index fdc69c1..d274dd1 100644
--- a/metrics/api.go
+++ b/metrics/api.go
@@ -25,9 +25,11 @@
"dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
)
-var registries = make(map[string]func(*ReporterConfig) MetricRegistry)
-var collectors = make([]CollectorFunc, 0)
-var registry MetricRegistry
+var (
+ registries = make(map[string]func(*ReporterConfig) MetricRegistry)
+ collectors = make([]CollectorFunc, 0)
+ registry MetricRegistry
+)
// CollectorFunc used to extend more indicators
type CollectorFunc func(MetricRegistry, *ReporterConfig)
@@ -59,15 +61,22 @@
// MetricRegistry data container,data compute、expose、agg
type MetricRegistry interface {
- Counter(*MetricId) CounterMetric // add or update a counter
- Gauge(*MetricId) GaugeMetric // add or update a gauge
- Histogram(*MetricId) HistogramMetric // add a metric num to a histogram
- Summary(*MetricId) SummaryMetric // add a metric num to a summary
- Export() // expose metric data, such as Prometheus http exporter
+ Counter(*MetricId) CounterMetric // add or update a counter
+ Gauge(*MetricId) GaugeMetric // add or update a gauge
+ Histogram(*MetricId) ObservableMetric // add a metric num to a histogram
+ Summary(*MetricId) ObservableMetric // add a metric num to a summary
+ Rt(*MetricId, *RtOpts) ObservableMetric // add a metric num to a rt
+ Export() // expose metric data, such as Prometheus http exporter
// GetMetrics() []*MetricSample // get all metric data
// GetMetricsString() (string, error) // get text format metric data
}
+type RtOpts struct {
+ Aggregate bool
+ BucketNum int // only for aggRt
+ TimeWindowSeconds int64 // only for aggRt
+}
+
// multi registry,like micrometer CompositeMeterRegistry
// type CompositeRegistry struct {
// rs []MetricRegistry
@@ -131,14 +140,9 @@
// Sub(float64)
}
-// HistogramMetric histogram metric
-type HistogramMetric interface {
- Record(float64)
-}
-
-// SummaryMetric summary metric
-type SummaryMetric interface {
- Record(float64)
+// histogram summary rt metric
+type ObservableMetric interface {
+ Observe(float64)
}
// StatesMetrics multi metrics,include total,success num, fail num,call MetricsRegistry save data
@@ -232,8 +236,13 @@
} else {
metricsCacheMutex.Lock()
defer metricsCacheMutex.Unlock()
- n := supplier()
- metricsCache[key] = n
- return n
+ v, ok = metricsCache[key] // double check,avoid overwriting
+ if ok {
+ return v
+ } else {
+ n := supplier()
+ metricsCache[key] = n
+ return n
+ }
}
}
diff --git a/metrics/prometheus/registry.go b/metrics/prometheus/registry.go
index 70946d6..21ebc0d 100644
--- a/metrics/prometheus/registry.go
+++ b/metrics/prometheus/registry.go
@@ -24,7 +24,6 @@
import (
prom "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/expfmt"
)
@@ -35,97 +34,96 @@
func init() {
metrics.SetRegistry("prometheus", func(rc *metrics.ReporterConfig) metrics.MetricRegistry {
- return &promMetricRegistry{
- cvm: make(map[string]*prom.CounterVec),
- gvm: make(map[string]*prom.GaugeVec),
- hvm: make(map[string]*prom.HistogramVec),
- svm: make(map[string]*prom.SummaryVec),
- }
+ return &promMetricRegistry{r: prom.DefaultRegisterer}
})
}
type promMetricRegistry struct {
- mtx sync.RWMutex // Protects metrics.
- cvm map[string]*prom.CounterVec // prom.CounterVec
- gvm map[string]*prom.GaugeVec // prom.GaugeVec
- hvm map[string]*prom.HistogramVec // prom.HistogramVec
- svm map[string]*prom.SummaryVec // prom.SummaryVec
+ r prom.Registerer // for convenience of testing
+ vecs sync.Map
+}
+
+func (p *promMetricRegistry) getOrComputeVec(key string, supplier func() interface{}) interface{} {
+ v, ok := p.vecs.Load(key)
+ if !ok {
+ v, ok = p.vecs.LoadOrStore(key, supplier())
+ if !ok {
+ p.r.MustRegister(v.(prom.Collector)) // only registe collector which stored success
+ }
+ }
+ return v
}
func (p *promMetricRegistry) Counter(m *metrics.MetricId) metrics.CounterMetric {
- p.mtx.RLock()
- vec, ok := p.cvm[m.Name]
- p.mtx.RUnlock()
- if !ok {
- p.mtx.Lock()
- vec = promauto.NewCounterVec(prom.CounterOpts{
+ vec := p.getOrComputeVec(m.Name, func() interface{} {
+ return prom.NewCounterVec(prom.CounterOpts{
Name: m.Name,
Help: m.Desc,
}, m.TagKeys())
- p.cvm[m.Name] = vec
- p.mtx.Unlock()
- }
- c := vec.With(m.Tags)
- return &counter{pc: c}
+ }).(*prom.CounterVec)
+ return vec.With(m.Tags)
}
func (p *promMetricRegistry) Gauge(m *metrics.MetricId) metrics.GaugeMetric {
- p.mtx.RLock()
- vec, ok := p.gvm[m.Name]
- p.mtx.RUnlock()
- if !ok {
- p.mtx.Lock()
- vec = promauto.NewGaugeVec(prom.GaugeOpts{
+ vec := p.getOrComputeVec(m.Name, func() interface{} {
+ return prom.NewGaugeVec(prom.GaugeOpts{
Name: m.Name,
Help: m.Desc,
}, m.TagKeys())
- p.gvm[m.Name] = vec
- p.mtx.Unlock()
- }
- g := vec.With(m.Tags)
- return &gauge{pg: g}
+ }).(*prom.GaugeVec)
+ return vec.With(m.Tags)
}
-func (p *promMetricRegistry) Histogram(m *metrics.MetricId) metrics.HistogramMetric {
- p.mtx.RLock()
- vec, ok := p.hvm[m.Name]
- p.mtx.RUnlock()
- if !ok {
- p.mtx.Lock()
- vec = promauto.NewHistogramVec(prom.HistogramOpts{
+func (p *promMetricRegistry) Histogram(m *metrics.MetricId) metrics.ObservableMetric {
+ vec := p.getOrComputeVec(m.Name, func() interface{} {
+ return prom.NewHistogramVec(prom.HistogramOpts{
Name: m.Name,
Help: m.Desc,
}, m.TagKeys())
- p.hvm[m.Name] = vec
- p.mtx.Unlock()
- }
- h := vec.With(m.Tags)
- return &histogram{ph: h.(prom.Histogram)}
+ }).(*prom.HistogramVec)
+ return vec.With(m.Tags)
}
-func (p *promMetricRegistry) Summary(m *metrics.MetricId) metrics.SummaryMetric {
- p.mtx.RLock()
- vec, ok := p.svm[m.Name]
- p.mtx.RUnlock()
- if !ok {
- p.mtx.Lock()
- vec = promauto.NewSummaryVec(prom.SummaryOpts{
+func (p *promMetricRegistry) Summary(m *metrics.MetricId) metrics.ObservableMetric {
+ vec := p.getOrComputeVec(m.Name, func() interface{} {
+ return prom.NewSummaryVec(prom.SummaryOpts{
Name: m.Name,
Help: m.Desc,
}, m.TagKeys())
- p.svm[m.Name] = vec
- p.mtx.Unlock()
+ }).(*prom.SummaryVec)
+ return vec.With(m.Tags)
+}
+
+func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts *metrics.RtOpts) metrics.ObservableMetric {
+ var supplier func() interface{}
+ if opts != nil && opts.Aggregate {
+ supplier = func() interface{} {
+ // TODO set default aggregate config from config
+ return NewAggRtVec(&RtOpts{
+ Name: m.Name,
+ Help: m.Desc,
+ bucketNum: opts.BucketNum,
+ timeWindowSeconds: opts.TimeWindowSeconds,
+ }, m.TagKeys())
+ }
+ } else {
+ supplier = func() interface{} {
+ return NewRtVec(&RtOpts{
+ Name: m.Name,
+ Help: m.Desc,
+ }, m.TagKeys())
+ }
}
- s := vec.With(m.Tags)
- return &summary{ps: s.(prom.Summary)}
+ vec := p.getOrComputeVec(m.Name, supplier).(*RtVec)
+ return vec.With(m.Tags)
}
func (p *promMetricRegistry) Export() {
-
+ // use promauto export global, TODO move here
}
func (p *promMetricRegistry) Scrape() (string, error) {
- r := prom.DefaultRegisterer.(*prom.Registry)
+ r := p.r.(prom.Gatherer)
gathering, err := r.Gather()
if err != nil {
return "", err
@@ -138,52 +136,3 @@
}
return out.String(), nil
}
-
-type counter struct {
- pc prom.Counter
-}
-
-func (c *counter) Inc() {
- c.pc.Inc()
-}
-func (c *counter) Add(v float64) {
- c.pc.Add(v)
-}
-
-type gauge struct {
- pg prom.Gauge
-}
-
-// func (g *gauge) Inc() {
-// g.pg.Inc()
-// }
-//
-// func (g *gauge) Dec() {
-// g.pg.Dec()
-// }
-func (g *gauge) Set(v float64) {
- g.pg.Set(v)
-}
-
-// func (g *gauge) Add(v float64) {
-// g.pg.Add(v)
-// }
-// func (g *gauge) Sub(v float64) {
-// g.pg.Sub(v)
-// }
-
-type histogram struct {
- ph prom.Histogram
-}
-
-func (h *histogram) Record(v float64) {
- h.ph.Observe(v)
-}
-
-type summary struct {
- ps prom.Summary
-}
-
-func (s *summary) Record(v float64) {
- s.ps.Observe(v)
-}
diff --git a/metrics/prometheus/registry_test.go b/metrics/prometheus/registry_test.go
new file mode 100644
index 0000000..7e7ac2a
--- /dev/null
+++ b/metrics/prometheus/registry_test.go
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package prometheus
+
+import (
+ "sync"
+ "testing"
+)
+
+import (
+ prom "github.com/prometheus/client_golang/prometheus"
+
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+)
+
+var (
+ tags = map[string]string{"app": "dubbo", "version": "1.0.0"}
+ metricId = &metrics.MetricId{Name: "dubbo_request", Desc: "request", Tags: tags}
+)
+
+func TestPromMetricRegistryCounter(t *testing.T) {
+ p := &promMetricRegistry{r: prom.NewRegistry()}
+ p.Counter(metricId).Inc()
+ text, err := p.Scrape()
+ assert.Nil(t, err)
+ assert.Contains(t, text, "# HELP dubbo_request request\n# TYPE dubbo_request counter")
+ assert.Contains(t, text, `dubbo_request{app="dubbo",version="1.0.0"} 1`)
+}
+
+func TestPromMetricRegistryGauge(t *testing.T) {
+ p := &promMetricRegistry{r: prom.NewRegistry()}
+ p.Gauge(metricId).Set(100)
+ text, err := p.Scrape()
+ assert.Nil(t, err)
+ assert.Contains(t, text, "# HELP dubbo_request request\n# TYPE dubbo_request gauge")
+ assert.Contains(t, text, `dubbo_request{app="dubbo",version="1.0.0"} 100`)
+
+}
+
+func TestPromMetricRegistryHistogram(t *testing.T) {
+ p := &promMetricRegistry{r: prom.NewRegistry()}
+ p.Histogram(metricId).Observe(100)
+ text, err := p.Scrape()
+ assert.Nil(t, err)
+ assert.Contains(t, text, "# HELP dubbo_request request\n# TYPE dubbo_request histogram")
+ assert.Contains(t, text, `dubbo_request_bucket{app="dubbo",version="1.0.0",le="+Inf"} 1`)
+ assert.Contains(t, text, `dubbo_request_sum{app="dubbo",version="1.0.0"} 100`)
+ assert.Contains(t, text, `dubbo_request_count{app="dubbo",version="1.0.0"} 1`)
+}
+
+func TestPromMetricRegistrySummary(t *testing.T) {
+ p := &promMetricRegistry{r: prom.NewRegistry()}
+ p.Summary(metricId).Observe(100)
+ text, err := p.Scrape()
+ assert.Nil(t, err)
+ assert.Contains(t, text, "# HELP dubbo_request request\n# TYPE dubbo_request summary")
+ assert.Contains(t, text, "dubbo_request_sum{app=\"dubbo\",version=\"1.0.0\"} 100")
+ assert.Contains(t, text, "dubbo_request_count{app=\"dubbo\",version=\"1.0.0\"} 1")
+}
+
+func TestPromMetricRegistryRt(t *testing.T) {
+ p := &promMetricRegistry{r: prom.NewRegistry()}
+ for i := 0; i < 10; i++ {
+ p.Rt(metricId, &metrics.RtOpts{}).Observe(10 * float64(i))
+ }
+ text, err := p.Scrape()
+ assert.Nil(t, err)
+ assert.Contains(t, text, "# HELP dubbo_request_avg Average request\n# TYPE dubbo_request_avg gauge\ndubbo_request_avg{app=\"dubbo\",version=\"1.0.0\"} 45")
+ assert.Contains(t, text, "# HELP dubbo_request_last Last request\n# TYPE dubbo_request_last gauge\ndubbo_request_last{app=\"dubbo\",version=\"1.0.0\"} 90")
+ assert.Contains(t, text, "# HELP dubbo_request_max Max request\n# TYPE dubbo_request_max gauge\ndubbo_request_max{app=\"dubbo\",version=\"1.0.0\"} 90")
+ assert.Contains(t, text, "# HELP dubbo_request_min Min request\n# TYPE dubbo_request_min gauge\ndubbo_request_min{app=\"dubbo\",version=\"1.0.0\"} 0")
+ assert.Contains(t, text, "# HELP dubbo_request_sum Sum request\n# TYPE dubbo_request_sum gauge\ndubbo_request_sum{app=\"dubbo\",version=\"1.0.0\"} 450")
+
+ p = &promMetricRegistry{r: prom.NewRegistry()}
+ for i := 0; i < 10; i++ {
+ p.Rt(metricId, &metrics.RtOpts{Aggregate: true, BucketNum: 10, TimeWindowSeconds: 60}).Observe(10 * float64(i))
+ }
+ text, err = p.Scrape()
+ assert.Nil(t, err)
+ assert.Contains(t, text, "# HELP dubbo_request_avg_milliseconds_aggregate The average request\n# TYPE dubbo_request_avg_milliseconds_aggregate gauge\ndubbo_request_avg_milliseconds_aggregate{app=\"dubbo\",version=\"1.0.0\"} 45")
+ assert.Contains(t, text, "# HELP dubbo_request_max_milliseconds_aggregate The maximum request\n# TYPE dubbo_request_max_milliseconds_aggregate gauge\ndubbo_request_max_milliseconds_aggregate{app=\"dubbo\",version=\"1.0.0\"} 90")
+ assert.Contains(t, text, "# HELP dubbo_request_min_milliseconds_aggregate The minimum request\n# TYPE dubbo_request_min_milliseconds_aggregate gauge\ndubbo_request_min_milliseconds_aggregate{app=\"dubbo\",version=\"1.0.0\"} 0")
+}
+
+func TestPromMetricRegistryCounterConcurrent(t *testing.T) {
+ p := &promMetricRegistry{r: prom.NewRegistry()}
+ var wg sync.WaitGroup
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func() {
+ p.Counter(metricId).Inc()
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ text, err := p.Scrape()
+ assert.Nil(t, err)
+ assert.Contains(t, text, "# HELP dubbo_request request\n# TYPE dubbo_request counter")
+ assert.Contains(t, text, `dubbo_request{app="dubbo",version="1.0.0"} 10`)
+}
diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go
index 715fd97..0e9fedc 100644
--- a/metrics/prometheus/reporter.go
+++ b/metrics/prometheus/reporter.go
@@ -26,6 +26,7 @@
import (
"github.com/dubbogo/gost/log/logger"
+
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
diff --git a/metrics/prometheus/rt_vec.go b/metrics/prometheus/rt_vec.go
new file mode 100644
index 0000000..265374c
--- /dev/null
+++ b/metrics/prometheus/rt_vec.go
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package prometheus
+
+import (
+ "bytes"
+ "sync"
+)
+
+import (
+ prom "github.com/prometheus/client_golang/prometheus"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
+)
+
+type rtMetric struct {
+ nameSuffix string
+ helpPrefix string
+ valueFunc func(*aggregate.Result) float64
+}
+
+func (m *rtMetric) desc(opts *RtOpts, labels []string) *prom.Desc {
+ return prom.NewDesc(prom.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name+m.nameSuffix), m.helpPrefix+opts.Help, labels, opts.ConstLabels)
+}
+
+var (
+ rtMetrics = make([]*rtMetric, 5)
+ aggMetrics = make([]*rtMetric, 3)
+)
+
+func init() {
+ rtMetrics[0] = &rtMetric{nameSuffix: "_sum", helpPrefix: "Sum ", valueFunc: func(r *aggregate.Result) float64 { return r.Total }}
+ rtMetrics[1] = &rtMetric{nameSuffix: "_last", helpPrefix: "Last ", valueFunc: func(r *aggregate.Result) float64 { return r.Last }}
+ rtMetrics[2] = &rtMetric{nameSuffix: "_min", helpPrefix: "Min ", valueFunc: func(r *aggregate.Result) float64 { return r.Min }}
+ rtMetrics[3] = &rtMetric{nameSuffix: "_max", helpPrefix: "Max ", valueFunc: func(r *aggregate.Result) float64 { return r.Max }}
+ rtMetrics[4] = &rtMetric{nameSuffix: "_avg", helpPrefix: "Average ", valueFunc: func(r *aggregate.Result) float64 { return r.Avg }}
+
+ aggMetrics[0] = &rtMetric{nameSuffix: "_avg_milliseconds_aggregate", helpPrefix: "The average ", valueFunc: func(r *aggregate.Result) float64 { return r.Avg }}
+ aggMetrics[1] = &rtMetric{nameSuffix: "_min_milliseconds_aggregate", helpPrefix: "The minimum ", valueFunc: func(r *aggregate.Result) float64 { return r.Min }}
+ aggMetrics[2] = &rtMetric{nameSuffix: "_max_milliseconds_aggregate", helpPrefix: "The maximum ", valueFunc: func(r *aggregate.Result) float64 { return r.Max }}
+}
+
+type RtOpts struct {
+ Namespace string
+ Subsystem string
+ Name string
+ Help string
+ ConstLabels prom.Labels
+ bucketNum int // only for aggRt
+ timeWindowSeconds int64 // only for aggRt
+}
+
+type observer interface {
+ Observe(val float64)
+ result() *aggregate.Result
+}
+
+type aggResult struct {
+ agg *aggregate.TimeWindowAggregator
+}
+
+func (r *aggResult) Observe(val float64) {
+ r.agg.Add(val)
+}
+
+func (r *aggResult) result() *aggregate.Result {
+ return r.agg.Result()
+}
+
+type valueResult struct {
+ mtx sync.RWMutex
+ val *aggregate.Result
+}
+
+func (r *valueResult) Observe(val float64) {
+ r.mtx.Lock()
+ defer r.mtx.Unlock()
+ r.val.Update(val)
+}
+
+func (r *valueResult) result() *aggregate.Result {
+ res := aggregate.NewResult()
+ r.mtx.RLock()
+ res.Merge(r.val)
+ r.mtx.RUnlock()
+ return res.Get()
+}
+
+type Rt struct {
+ tags map[string]string
+ obs observer
+}
+
+func (r *Rt) Observe(val float64) {
+ r.obs.Observe(val)
+}
+
+func buildKey(m map[string]string, labNames []string) string {
+ var buffer bytes.Buffer
+ for _, label := range labNames {
+ if buffer.Len() != 0 {
+ buffer.WriteString("_")
+ }
+ buffer.WriteString(m[label])
+ }
+ return buffer.String()
+}
+
+func buildLabelValues(m map[string]string, labNames []string) []string {
+ values := make([]string, len(labNames))
+ for i, label := range labNames {
+ values[i] = m[label]
+ }
+ return values
+}
+
+type RtVec struct {
+ opts *RtOpts
+ labelNames []string
+ aggMap sync.Map
+ initFunc func(tags map[string]string) *Rt
+ metrics []*rtMetric
+}
+
+func NewRtVec(opts *RtOpts, labNames []string) *RtVec {
+ return &RtVec{
+ opts: opts,
+ labelNames: labNames,
+ metrics: rtMetrics,
+ initFunc: func(tags map[string]string) *Rt {
+ return &Rt{
+ tags: tags,
+ obs: &valueResult{val: aggregate.NewResult()},
+ }
+ },
+ }
+}
+
+func NewAggRtVec(opts *RtOpts, labNames []string) *RtVec {
+ return &RtVec{
+ opts: opts,
+ labelNames: labNames,
+ metrics: aggMetrics,
+ initFunc: func(tags map[string]string) *Rt {
+ return &Rt{
+ tags: tags,
+ obs: &aggResult{agg: aggregate.NewTimeWindowAggregator(opts.bucketNum, opts.timeWindowSeconds)},
+ }
+ },
+ }
+}
+
+func (r *RtVec) With(tags map[string]string) prom.Observer {
+ k := buildKey(tags, r.labelNames)
+ return r.computeIfAbsent(k, func() *Rt {
+ return r.initFunc(tags)
+ })
+}
+
+func (r *RtVec) computeIfAbsent(k string, supplier func() *Rt) *Rt {
+ v, ok := r.aggMap.Load(k)
+ if !ok {
+ v, _ = r.aggMap.LoadOrStore(k, supplier())
+ }
+ return v.(*Rt)
+}
+
+func (r *RtVec) Collect(ch chan<- prom.Metric) {
+ r.aggMap.Range(func(_, val interface{}) bool {
+ v := val.(*Rt)
+ res := v.obs.result()
+ for _, m := range r.metrics {
+ ch <- prom.MustNewConstMetric(m.desc(r.opts, r.labelNames), prom.GaugeValue, m.valueFunc(res), buildLabelValues(v.tags, r.labelNames)...)
+ }
+ return true
+ })
+}
+
+func (r *RtVec) Describe(ch chan<- *prom.Desc) {
+ for _, m := range r.metrics {
+ ch <- m.desc(r.opts, r.labelNames)
+ }
+}
diff --git a/metrics/prometheus/rt_vec_test.go b/metrics/prometheus/rt_vec_test.go
new file mode 100644
index 0000000..8964798
--- /dev/null
+++ b/metrics/prometheus/rt_vec_test.go
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package prometheus
+
+import (
+ "reflect"
+ "sync"
+ "testing"
+)
+
+import (
+ prom "github.com/prometheus/client_golang/prometheus"
+
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
+)
+
+func TestRtVecCollect(t *testing.T) {
+ opts := &RtOpts{
+ Name: "request_num",
+ bucketNum: 10,
+ timeWindowSeconds: 120,
+ Help: "Request cost",
+ }
+ labels := []string{"app", "version"}
+ vecs := [2]*RtVec{NewRtVec(opts, labels), NewAggRtVec(opts, labels)}
+ for _, r := range vecs {
+ r.With(map[string]string{"app": "dubbo", "version": "1.0.0"}).Observe(100)
+ ch := make(chan prom.Metric, len(r.metrics))
+ r.Collect(ch)
+ close(ch)
+ assert.Equal(t, len(ch), len(r.metrics))
+ for _, m := range r.metrics {
+ metric, ok := <-ch
+ if !ok {
+ t.Error("not enough metrics")
+ } else {
+ str := metric.Desc().String()
+ assert.Contains(t, str, m.nameSuffix)
+ assert.Contains(t, str, m.helpPrefix)
+ assert.Contains(t, str, "app")
+ assert.Contains(t, str, "version")
+ }
+ }
+ }
+}
+
+func TestRtVecDescribe(t *testing.T) {
+ opts := &RtOpts{
+ Name: "request_num",
+ bucketNum: 10,
+ timeWindowSeconds: 120,
+ Help: "Request cost",
+ }
+ labels := []string{"app", "version"}
+ vecs := [2]*RtVec{NewRtVec(opts, labels), NewAggRtVec(opts, labels)}
+ for _, r := range vecs {
+ ch := make(chan *prom.Desc, len(r.metrics))
+ r.Describe(ch)
+ close(ch)
+ assert.Equal(t, len(ch), len(r.metrics))
+ for _, m := range r.metrics {
+ desc, ok := <-ch
+ if !ok {
+ t.Error(t, "not enough desc")
+ } else {
+ str := desc.String()
+ assert.Contains(t, str, m.nameSuffix)
+ assert.Contains(t, str, m.helpPrefix)
+ assert.Contains(t, str, "app")
+ assert.Contains(t, str, "version")
+ }
+ }
+ }
+}
+
+func TestValueObserve(t *testing.T) {
+ rts := []*Rt{
+ {
+ tags: map[string]string{},
+ obs: &aggResult{agg: aggregate.NewTimeWindowAggregator(10, 10)},
+ },
+ {
+ tags: map[string]string{},
+ obs: &valueResult{val: aggregate.NewResult()},
+ },
+ }
+ want := &aggregate.Result{
+ Avg: 1,
+ Min: 1,
+ Max: 1,
+ Count: 1,
+ Total: 1,
+ Last: 1,
+ }
+ for i, v := range rts {
+ v.Observe(float64(1))
+ r := v.obs.result()
+ if i == 0 {
+ r.Last = 1 // agg result no Last, value is NaN
+ }
+ if !reflect.DeepEqual(r, want) {
+ t.Errorf("Result() = %v, want %v", r, want)
+ }
+ }
+}
+
+func TestRtVecWith(t *testing.T) {
+ opts := &RtOpts{
+ Name: "request_num",
+ bucketNum: 10,
+ timeWindowSeconds: 120,
+ Help: "Request cost",
+ }
+ labels := []string{"app", "version"}
+ vecs := [2]*RtVec{NewRtVec(opts, labels), NewAggRtVec(opts, labels)}
+ tags := map[string]string{"app": "dubbo", "version": "1.0.0"}
+ for _, r := range vecs {
+ first := r.With(tags)
+ second := r.With(tags)
+ assert.True(t, first == second) // init once
+ }
+}
+
+func TestRtVecWithConcurrent(t *testing.T) {
+ opts := &RtOpts{
+ Name: "request_num",
+ bucketNum: 10,
+ timeWindowSeconds: 120,
+ Help: "Request cost",
+ }
+ labels := []string{"app", "version"}
+ labelValues := map[string]string{"app": "dubbo", "version": "1.0.0"}
+ vecs := [2]*RtVec{NewRtVec(opts, labels), NewAggRtVec(opts, labels)}
+ for _, r := range vecs {
+ var wg sync.WaitGroup
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func() {
+ r.With(labelValues).Observe(100)
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ res := r.With(labelValues).(*Rt).obs.result()
+ assert.True(t, res.Count == uint64(10))
+ }
+}
diff --git a/metrics/prometheus/util.go b/metrics/prometheus/util.go
index 4e1a9c8..29e5ebf 100644
--- a/metrics/prometheus/util.go
+++ b/metrics/prometheus/util.go
@@ -24,6 +24,7 @@
import (
"github.com/dubbogo/gost/log/logger"
+
"github.com/prometheus/client_golang/prometheus"
)
diff --git a/metrics/registry/collector.go b/metrics/registry/collector.go
index 7479fd2..63840ed 100644
--- a/metrics/registry/collector.go
+++ b/metrics/registry/collector.go
@@ -101,7 +101,7 @@
// Event is converted to metrics
// Save metrics to the MetricRegistry
rc.regRegistry.Counter(metrics.NewMetricId(NotifyMetricRequests, metrics.GetApplicationLevel())).Inc()
- rc.regRegistry.Histogram(metrics.NewMetricId(NotifyMetricNumLast, metrics.GetApplicationLevel())).Record(float64(event.End.UnixNano()) / float64(time.Second))
+ rc.regRegistry.Histogram(metrics.NewMetricId(NotifyMetricNumLast, metrics.GetApplicationLevel())).Observe(float64(event.End.UnixNano()) / float64(time.Second))
metric := metrics.ComputeIfAbsentCache(dubboNotifyRt, func() interface{} {
return newTimeMetrics(NotifyRtMillisecondsMin, NotifyRtMillisecondsMax, NotifyRtMillisecondsAvg, NotifyRtMillisecondsSum, NotifyRtMillisecondsLast, metrics.GetApplicationLevel(), rc.regRegistry)
}).(metrics.TimeMetric)
diff --git a/metrics/util/aggregate/aggregator.go b/metrics/util/aggregate/aggregator.go
index 0f31f3f..a584cc6 100644
--- a/metrics/util/aggregate/aggregator.go
+++ b/metrics/util/aggregate/aggregator.go
@@ -45,36 +45,49 @@
Max float64
Avg float64
Count uint64
+ Last float64
+}
+
+func NewResult() *Result {
+ return &Result{
+ Min: math.MaxFloat64,
+ Max: math.SmallestNonzeroFloat64,
+ Last: math.NaN(),
+ }
+}
+
+func (r *Result) Update(v float64) {
+ r.Min = math.Min(r.Min, v)
+ r.Max = math.Max(r.Max, v)
+ r.Last = v
+ r.Total += v
+ r.Count++
+}
+
+func (r *Result) Merge(o *Result) {
+ r.Min = math.Min(r.Min, o.Min)
+ r.Max = math.Max(r.Max, o.Max)
+ r.Total += o.Total
+ r.Count += o.Count
+ r.Last = o.Last
+}
+
+func (r *Result) Get() *Result {
+ if r.Count > 0 {
+ r.Avg = r.Total / float64(r.Count)
+ }
+ return r
}
// Result returns the aggregate result of the sliding window by aggregating all panes.
func (t *TimeWindowAggregator) Result() *Result {
t.mux.RLock()
defer t.mux.RUnlock()
-
- res := &Result{}
-
- total := 0.0
- count := uint64(0)
- max := math.SmallestNonzeroFloat64
- min := math.MaxFloat64
-
+ res := NewResult()
for _, v := range t.window.values(time.Now().UnixMilli()) {
- total += v.(*aggregator).total
- count += v.(*aggregator).count
- max = math.Max(max, v.(*aggregator).max)
- min = math.Min(min, v.(*aggregator).min)
+ res.Merge(v.(*Result)) // Last not as expect, but agg result has no Last value
}
-
- if count > 0 {
- res.Avg = total / float64(count)
- res.Count = count
- res.Total = total
- res.Max = max
- res.Min = min
- }
-
- return res
+ return res.Get()
}
// Add adds a value to the sliding window's current pane.
@@ -82,52 +95,9 @@
t.mux.Lock()
defer t.mux.Unlock()
- t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue).value.(*aggregator).add(v)
+ t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue).value.(*Result).Update(v)
}
func (t *TimeWindowAggregator) newEmptyValue() interface{} {
- return newAggregator()
-}
-
-// aggregator is a custom struct to aggregate data.
-//
-// It is NOT concurrent-safe.
-// It aggregates data by calculating the min, max, total and count.
-type aggregator struct {
- min float64
- max float64
- total float64
- count uint64
-}
-
-func newAggregator() *aggregator {
- return &aggregator{
- min: math.MaxFloat64,
- max: math.SmallestNonzeroFloat64,
- total: float64(0),
- count: uint64(0),
- }
-}
-
-func (a *aggregator) add(v float64) {
- a.updateMin(v)
- a.updateMax(v)
- a.updateTotal(v)
- a.updateCount()
-}
-
-func (a *aggregator) updateMin(v float64) {
- a.min = math.Min(a.min, v)
-}
-
-func (a *aggregator) updateMax(v float64) {
- a.max = math.Max(a.max, v)
-}
-
-func (a *aggregator) updateTotal(v float64) {
- a.total += v
-}
-
-func (a *aggregator) updateCount() {
- a.count++
+ return NewResult()
}
diff --git a/metrics/util/aggregate/aggregator_test.go b/metrics/util/aggregate/aggregator_test.go
index d3fa4f6..ab661dc 100644
--- a/metrics/util/aggregate/aggregator_test.go
+++ b/metrics/util/aggregate/aggregator_test.go
@@ -48,7 +48,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if got := timeWindowAggregator.Result(); !reflect.DeepEqual(got, tt.want) {
+ got := timeWindowAggregator.Result()
+ got.Last = 0 // NaN can not equal
+ if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Result() = %v, want %v", got, tt.want)
}
})