feat: sliding window and rt quantile metrics (#2356)
diff --git a/go.mod b/go.mod
index d1da3aa..542118f 100644
--- a/go.mod
+++ b/go.mod
@@ -31,6 +31,7 @@
github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/vault/sdk v0.7.0
+ github.com/influxdata/tdigest v0.0.1
github.com/jinzhu/copier v0.3.5
github.com/knadh/koanf v1.5.0
github.com/kr/pretty v0.3.0 // indirect
diff --git a/go.sum b/go.sum
index f6f63da..5390019 100644
--- a/go.sum
+++ b/go.sum
@@ -826,6 +826,8 @@
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
+github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY=
+github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jhump/protoreflect v1.6.0/go.mod h1:eaTn3RZAmMBcV0fifFvlm6VHNz3wSkYyXYWUh7ymB74=
@@ -1630,8 +1632,10 @@
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
+gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.8.2 h1:CCXrcPKiGGotvnN6jfUsKk4rRqm7q09/YbKb5xCEvtM=
gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0=
+gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0 h1:OE9mWmgKkjJyEmDAAtGMPjXu+YNeGvK9VTSHY6+Qihc=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
diff --git a/metrics/prometheus/constant.go b/metrics/prometheus/constant.go
index c640fd7..275fe25 100644
--- a/metrics/prometheus/constant.go
+++ b/metrics/prometheus/constant.go
@@ -30,7 +30,9 @@
ipKey = constant.IpKey
methodKey = constant.MethodKey
versionKey = constant.VersionKey
+)
+const (
providerField = "provider"
consumerField = "consumer"
@@ -49,3 +51,7 @@
processingField = "processing"
succeedField = "succeed"
)
+
+var (
+ quantiles = []float64{0.5, 0.9, 0.95, 0.99}
+)
diff --git a/metrics/prometheus/metric_set.go b/metrics/prometheus/metric_set.go
index 115c6ca..6c661c7 100644
--- a/metrics/prometheus/metric_set.go
+++ b/metrics/prometheus/metric_set.go
@@ -18,6 +18,8 @@
package prometheus
import (
+ "fmt"
+ "strconv"
"strings"
)
@@ -49,6 +51,7 @@
rtMillisecondsSum *prometheus.CounterVec
rtMillisecondsAvg *GaugeVecWithSyncMap
rtMillisecondsLast *prometheus.GaugeVec
+ rtMillisecondsQuantiles *quantileGaugeVec
}
type providerMetrics struct {
@@ -64,6 +67,7 @@
pm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(providerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(providerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames)
+ pm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(providerField, quantiles), reporterConfig.Namespace, labelNames, quantiles)
}
type consumerMetrics struct {
@@ -79,8 +83,10 @@
cm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(consumerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(consumerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames)
+ cm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(consumerField, quantiles), reporterConfig.Namespace, labelNames, quantiles)
}
+// buildMetricsName builds metrics name split by "_".
func buildMetricsName(args ...string) string {
sb := strings.Builder{}
for _, arg := range args {
@@ -90,3 +96,14 @@
res := strings.TrimPrefix(sb.String(), "_")
return res
}
+
+// buildRTQuantilesMetricsNames is only used for building rt quantiles metric names.
+func buildRTQuantilesMetricsNames(role string, quantiles []float64) []string {
+ res := make([]string, 0, len(quantiles))
+ for _, q := range quantiles {
+ quantileField := fmt.Sprintf("p%v", strconv.FormatFloat(q*100, 'f', -1, 64))
+ name := buildMetricsName(role, rtField, milliSecondsField, quantileField)
+ res = append(res, name)
+ }
+ return res
+}
diff --git a/metrics/prometheus/model.go b/metrics/prometheus/model.go
index d62734f..7ee94f2 100644
--- a/metrics/prometheus/model.go
+++ b/metrics/prometheus/model.go
@@ -29,6 +29,10 @@
"github.com/prometheus/client_golang/prometheus/promauto"
)
+import (
+ "dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
+)
+
func newHistogramVec(name, namespace string, labels []string) *prometheus.HistogramVec {
return prometheus.NewHistogramVec(
prometheus.HistogramOpts{
@@ -151,7 +155,7 @@
type GaugeVecWithSyncMap struct {
GaugeVec *prometheus.GaugeVec
- SyncMap *sync.Map
+ SyncMap *sync.Map // key: labels, value: *atomic.Value
}
func newAutoGaugeVecWithSyncMap(name, namespace string, labels []string) *GaugeVecWithSyncMap {
@@ -253,3 +257,44 @@
}
}
}
+
+type quantileGaugeVec struct {
+ gaugeVecSlice []*prometheus.GaugeVec
+ quantiles []float64
+ syncMap *sync.Map // key: labels string, value: TimeWindowQuantile
+}
+
+// Notice: names and quantiles should be the same length and same order.
+func newQuantileGaugeVec(names []string, namespace string, labels []string, quantiles []float64) *quantileGaugeVec {
+ gvs := make([]*prometheus.GaugeVec, len(names))
+ for i, name := range names {
+ gvs[i] = newAutoGaugeVec(name, namespace, labels)
+ }
+ gv := &quantileGaugeVec{
+ gaugeVecSlice: gvs,
+ quantiles: quantiles,
+ syncMap: &sync.Map{},
+ }
+ return gv
+}
+
+func (gv *quantileGaugeVec) updateQuantile(labels *prometheus.Labels, curValue int64) {
+ key := convertLabelsToMapKey(*labels)
+ cur := aggregate.NewTimeWindowQuantile(100, 10, 120)
+ cur.Add(float64(curValue))
+
+ updateFunc := func(td *aggregate.TimeWindowQuantile) {
+ qs := td.Quantiles(gv.quantiles)
+ for i, q := range qs {
+ gv.gaugeVecSlice[i].With(*labels).Set(q)
+ }
+ }
+
+ if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded {
+ store := actual.(*aggregate.TimeWindowQuantile)
+ store.Add(float64(curValue))
+ updateFunc(store)
+ } else {
+ updateFunc(cur)
+ }
+}
diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go
index 7ca4398..c123df3 100644
--- a/metrics/prometheus/reporter.go
+++ b/metrics/prometheus/reporter.go
@@ -186,11 +186,13 @@
go reporter.provider.rtMillisecondsMin.updateMin(labels, costMs)
go reporter.provider.rtMillisecondsMax.updateMax(labels, costMs)
go reporter.provider.rtMillisecondsAvg.updateAvg(labels, costMs)
+ go reporter.provider.rtMillisecondsQuantiles.updateQuantile(labels, costMs)
case consumerField:
go reporter.consumer.rtMillisecondsLast.With(*labels).Set(float64(costMs))
go reporter.consumer.rtMillisecondsSum.With(*labels).Add(float64(costMs))
go reporter.consumer.rtMillisecondsMin.updateMin(labels, costMs)
go reporter.consumer.rtMillisecondsMax.updateMax(labels, costMs)
go reporter.consumer.rtMillisecondsAvg.updateAvg(labels, costMs)
+ go reporter.consumer.rtMillisecondsQuantiles.updateQuantile(labels, costMs)
}
}
diff --git a/metrics/util/aggregate/pane.go b/metrics/util/aggregate/pane.go
new file mode 100644
index 0000000..a7b0fba
--- /dev/null
+++ b/metrics/util/aggregate/pane.go
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aggregate
+
+// pane represents a window over a period of time.
+// It uses interface{} to store any type of value.
+type pane struct {
+ startInMs int64
+ endInMs int64
+ intervalInMs int64
+ value interface{}
+}
+
+func newPane(intervalInMs, startInMs int64, value interface{}) *pane {
+ return &pane{
+ startInMs: startInMs,
+ endInMs: startInMs + intervalInMs,
+ intervalInMs: intervalInMs,
+ value: value,
+ }
+}
+
+func (p *pane) resetTo(startInMs int64, value interface{}) {
+ p.startInMs = startInMs
+ p.endInMs = startInMs + p.intervalInMs
+ p.value = value
+}
diff --git a/metrics/util/aggregate/quantile.go b/metrics/util/aggregate/quantile.go
new file mode 100644
index 0000000..1a78ba5
--- /dev/null
+++ b/metrics/util/aggregate/quantile.go
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aggregate
+
+import (
+ "sync"
+ "time"
+)
+
+import (
+ "github.com/influxdata/tdigest"
+)
+
+// TimeWindowQuantile wrappers sliding window around T-Digest.
+//
+// It uses T-Digest algorithm to calculate quantile.
+// The window is divided into several panes, and each pane's value is a TDigest instance.
+type TimeWindowQuantile struct {
+ compression float64
+ window *slidingWindow
+ mux sync.RWMutex
+}
+
+func NewTimeWindowQuantile(compression float64, paneCount int, timeWindowSeconds int64) *TimeWindowQuantile {
+ return &TimeWindowQuantile{
+ compression: compression,
+ window: newSlidingWindow(paneCount, timeWindowSeconds*1000),
+ }
+}
+
+// Quantile returns a quantile of the sliding window by merging all panes.
+func (t *TimeWindowQuantile) Quantile(q float64) float64 {
+ return t.mergeTDigests().Quantile(q)
+}
+
+// Quantiles returns quantiles of the sliding window by merging all panes.
+func (t *TimeWindowQuantile) Quantiles(qs []float64) []float64 {
+ td := t.mergeTDigests()
+
+ res := make([]float64, len(qs))
+ for i, q := range qs {
+ res[i] = td.Quantile(q)
+ }
+
+ return res
+}
+
+// mergeTDigests merges all panes' TDigests into one TDigest.
+func (t *TimeWindowQuantile) mergeTDigests() *tdigest.TDigest {
+ t.mux.RLock()
+ defer t.mux.RUnlock()
+
+ td := tdigest.NewWithCompression(t.compression)
+ for _, v := range t.window.values(time.Now().UnixMilli()) {
+ td.AddCentroidList(v.(*tdigest.TDigest).Centroids())
+ }
+ return td
+}
+
+// Add adds a value to the sliding window's current pane.
+func (t *TimeWindowQuantile) Add(value float64) {
+ t.mux.Lock()
+ defer t.mux.Unlock()
+
+ t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue).value.(*tdigest.TDigest).Add(value, 1)
+}
+
+func (t *TimeWindowQuantile) newEmptyValue() interface{} {
+ return tdigest.NewWithCompression(t.compression)
+}
diff --git a/metrics/util/aggregate/quantile_test.go b/metrics/util/aggregate/quantile_test.go
new file mode 100644
index 0000000..5e82431
--- /dev/null
+++ b/metrics/util/aggregate/quantile_test.go
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aggregate
+
+import "testing"
+
+func TestAddAndQuantile(t1 *testing.T) {
+ timeWindowQuantile := NewTimeWindowQuantile(100, 10, 1)
+ for i := 1; i <= 100; i++ {
+ timeWindowQuantile.Add(float64(i))
+ }
+
+ type args struct {
+ q float64
+ }
+
+ tests := []struct {
+ name string
+ args args
+ want float64
+ }{
+ {
+ name: "Quantile: 0.01",
+ args: args{
+ q: 0.01,
+ },
+ want: 1.5,
+ },
+ {
+ name: "Quantile: 0.99",
+ args: args{
+ q: 0.99,
+ },
+ want: 99.5,
+ },
+ }
+ for _, tt := range tests {
+ t1.Run(tt.name, func(t1 *testing.T) {
+ t := timeWindowQuantile
+ if got := t.Quantile(tt.args.q); got != tt.want {
+ t1.Errorf("Quantile() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/metrics/util/aggregate/sliding_window.go b/metrics/util/aggregate/sliding_window.go
new file mode 100644
index 0000000..feda921
--- /dev/null
+++ b/metrics/util/aggregate/sliding_window.go
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aggregate
+
+// SlidingWindow adopts sliding window algorithm for statistics.
+//
+// It is not thread-safe.
+// A window contains paneCount panes.
+// intervalInMs = paneCount * paneIntervalInMs.
+type slidingWindow struct {
+ paneCount int
+ intervalInMs int64
+ paneIntervalInMs int64
+ paneSlice []*pane
+}
+
+func newSlidingWindow(paneCount int, intervalInMs int64) *slidingWindow {
+ return &slidingWindow{
+ paneCount: paneCount,
+ intervalInMs: intervalInMs,
+ paneIntervalInMs: intervalInMs / int64(paneCount),
+ paneSlice: make([]*pane, paneCount),
+ }
+}
+
+// values get all values from the slidingWindow's paneSlice.
+func (s *slidingWindow) values(timeMillis int64) []interface{} {
+ if timeMillis < 0 {
+ return make([]interface{}, 0)
+ }
+
+ res := make([]interface{}, 0, s.paneCount)
+
+ for _, p := range s.paneSlice {
+ if p == nil || s.isPaneDeprecated(p, timeMillis) {
+ continue
+ }
+ res = append(res, p.value)
+ }
+
+ return res
+}
+
+// isPaneDeprecated checks if the specified pane is deprecated at the specified timeMillis
+func (s *slidingWindow) isPaneDeprecated(pane *pane, timeMillis int64) bool {
+ return timeMillis-pane.startInMs > s.intervalInMs
+}
+
+// currentPane get the pane at the specified timestamp in milliseconds.
+func (s *slidingWindow) currentPane(timeMillis int64, newEmptyValue func() interface{}) *pane {
+ if timeMillis < 0 {
+ return nil
+ }
+ paneIdx := s.calcPaneIdx(timeMillis)
+ paneStart := s.calcPaneStart(timeMillis)
+
+ if s.paneSlice[paneIdx] == nil {
+ p := newPane(s.paneIntervalInMs, paneStart, newEmptyValue())
+ s.paneSlice[paneIdx] = p
+ return p
+ } else {
+ p := s.paneSlice[paneIdx]
+ if paneStart == p.startInMs {
+ return p
+ } else if paneStart > p.startInMs {
+ // The pane has deprecated. To avoid the overhead of creating a new instance, reset the original pane directly.
+ p.resetTo(paneStart, newEmptyValue())
+ return p
+ } else {
+ // The specified timestamp has passed.
+ return newPane(s.paneIntervalInMs, paneStart, newEmptyValue())
+ }
+ }
+}
+
+func (s *slidingWindow) calcPaneIdx(timeMillis int64) int {
+ return int(timeMillis/s.paneIntervalInMs) % s.paneCount
+}
+
+func (s *slidingWindow) calcPaneStart(timeMillis int64) int64 {
+ return timeMillis - timeMillis%s.paneIntervalInMs
+}