feat: sliding window and rt quantile metrics (#2356)

diff --git a/go.mod b/go.mod
index d1da3aa..542118f 100644
--- a/go.mod
+++ b/go.mod
@@ -31,6 +31,7 @@
 	github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99 // indirect
 	github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
 	github.com/hashicorp/vault/sdk v0.7.0
+	github.com/influxdata/tdigest v0.0.1
 	github.com/jinzhu/copier v0.3.5
 	github.com/knadh/koanf v1.5.0
 	github.com/kr/pretty v0.3.0 // indirect
diff --git a/go.sum b/go.sum
index f6f63da..5390019 100644
--- a/go.sum
+++ b/go.sum
@@ -826,6 +826,8 @@
 github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
 github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
+github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY=
+github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y=
 github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
 github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
 github.com/jhump/protoreflect v1.6.0/go.mod h1:eaTn3RZAmMBcV0fifFvlm6VHNz3wSkYyXYWUh7ymB74=
@@ -1630,8 +1632,10 @@
 golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
 gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
+gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
 gonum.org/v1/gonum v0.8.2 h1:CCXrcPKiGGotvnN6jfUsKk4rRqm7q09/YbKb5xCEvtM=
 gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0=
+gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
 gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0 h1:OE9mWmgKkjJyEmDAAtGMPjXu+YNeGvK9VTSHY6+Qihc=
 gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
 gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
diff --git a/metrics/prometheus/constant.go b/metrics/prometheus/constant.go
index c640fd7..275fe25 100644
--- a/metrics/prometheus/constant.go
+++ b/metrics/prometheus/constant.go
@@ -30,7 +30,9 @@
 	ipKey              = constant.IpKey
 	methodKey          = constant.MethodKey
 	versionKey         = constant.VersionKey
+)
 
+const (
 	providerField = "provider"
 	consumerField = "consumer"
 
@@ -49,3 +51,7 @@
 	processingField = "processing"
 	succeedField    = "succeed"
 )
+
+var (
+	quantiles = []float64{0.5, 0.9, 0.95, 0.99}
+)
diff --git a/metrics/prometheus/metric_set.go b/metrics/prometheus/metric_set.go
index 115c6ca..6c661c7 100644
--- a/metrics/prometheus/metric_set.go
+++ b/metrics/prometheus/metric_set.go
@@ -18,6 +18,8 @@
 package prometheus
 
 import (
+	"fmt"
+	"strconv"
 	"strings"
 )
 
@@ -49,6 +51,7 @@
 	rtMillisecondsSum       *prometheus.CounterVec
 	rtMillisecondsAvg       *GaugeVecWithSyncMap
 	rtMillisecondsLast      *prometheus.GaugeVec
+	rtMillisecondsQuantiles *quantileGaugeVec
 }
 
 type providerMetrics struct {
@@ -64,6 +67,7 @@
 	pm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(providerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames)
 	pm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames)
 	pm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(providerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames)
+	pm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(providerField, quantiles), reporterConfig.Namespace, labelNames, quantiles)
 }
 
 type consumerMetrics struct {
@@ -79,8 +83,10 @@
 	cm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(consumerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames)
 	cm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames)
 	cm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(consumerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames)
+	cm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(consumerField, quantiles), reporterConfig.Namespace, labelNames, quantiles)
 }
 
+// buildMetricsName builds metrics name split by "_".
 func buildMetricsName(args ...string) string {
 	sb := strings.Builder{}
 	for _, arg := range args {
@@ -90,3 +96,14 @@
 	res := strings.TrimPrefix(sb.String(), "_")
 	return res
 }
+
+// buildRTQuantilesMetricsNames is only used for building rt quantiles metric names.
+func buildRTQuantilesMetricsNames(role string, quantiles []float64) []string {
+	res := make([]string, 0, len(quantiles))
+	for _, q := range quantiles {
+		quantileField := fmt.Sprintf("p%v", strconv.FormatFloat(q*100, 'f', -1, 64))
+		name := buildMetricsName(role, rtField, milliSecondsField, quantileField)
+		res = append(res, name)
+	}
+	return res
+}
diff --git a/metrics/prometheus/model.go b/metrics/prometheus/model.go
index d62734f..7ee94f2 100644
--- a/metrics/prometheus/model.go
+++ b/metrics/prometheus/model.go
@@ -29,6 +29,10 @@
 	"github.com/prometheus/client_golang/prometheus/promauto"
 )
 
+import (
+	"dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
+)
+
 func newHistogramVec(name, namespace string, labels []string) *prometheus.HistogramVec {
 	return prometheus.NewHistogramVec(
 		prometheus.HistogramOpts{
@@ -151,7 +155,7 @@
 
 type GaugeVecWithSyncMap struct {
 	GaugeVec *prometheus.GaugeVec
-	SyncMap  *sync.Map
+	SyncMap  *sync.Map // key: labels, value: *atomic.Value
 }
 
 func newAutoGaugeVecWithSyncMap(name, namespace string, labels []string) *GaugeVecWithSyncMap {
@@ -253,3 +257,44 @@
 		}
 	}
 }
+
+type quantileGaugeVec struct {
+	gaugeVecSlice []*prometheus.GaugeVec
+	quantiles     []float64
+	syncMap       *sync.Map // key: labels string, value: TimeWindowQuantile
+}
+
+// Notice: names and quantiles should be the same length and same order.
+func newQuantileGaugeVec(names []string, namespace string, labels []string, quantiles []float64) *quantileGaugeVec {
+	gvs := make([]*prometheus.GaugeVec, len(names))
+	for i, name := range names {
+		gvs[i] = newAutoGaugeVec(name, namespace, labels)
+	}
+	gv := &quantileGaugeVec{
+		gaugeVecSlice: gvs,
+		quantiles:     quantiles,
+		syncMap:       &sync.Map{},
+	}
+	return gv
+}
+
+func (gv *quantileGaugeVec) updateQuantile(labels *prometheus.Labels, curValue int64) {
+	key := convertLabelsToMapKey(*labels)
+	cur := aggregate.NewTimeWindowQuantile(100, 10, 120)
+	cur.Add(float64(curValue))
+
+	updateFunc := func(td *aggregate.TimeWindowQuantile) {
+		qs := td.Quantiles(gv.quantiles)
+		for i, q := range qs {
+			gv.gaugeVecSlice[i].With(*labels).Set(q)
+		}
+	}
+
+	if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded {
+		store := actual.(*aggregate.TimeWindowQuantile)
+		store.Add(float64(curValue))
+		updateFunc(store)
+	} else {
+		updateFunc(cur)
+	}
+}
diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go
index 7ca4398..c123df3 100644
--- a/metrics/prometheus/reporter.go
+++ b/metrics/prometheus/reporter.go
@@ -186,11 +186,13 @@
 		go reporter.provider.rtMillisecondsMin.updateMin(labels, costMs)
 		go reporter.provider.rtMillisecondsMax.updateMax(labels, costMs)
 		go reporter.provider.rtMillisecondsAvg.updateAvg(labels, costMs)
+		go reporter.provider.rtMillisecondsQuantiles.updateQuantile(labels, costMs)
 	case consumerField:
 		go reporter.consumer.rtMillisecondsLast.With(*labels).Set(float64(costMs))
 		go reporter.consumer.rtMillisecondsSum.With(*labels).Add(float64(costMs))
 		go reporter.consumer.rtMillisecondsMin.updateMin(labels, costMs)
 		go reporter.consumer.rtMillisecondsMax.updateMax(labels, costMs)
 		go reporter.consumer.rtMillisecondsAvg.updateAvg(labels, costMs)
+		go reporter.consumer.rtMillisecondsQuantiles.updateQuantile(labels, costMs)
 	}
 }
diff --git a/metrics/util/aggregate/pane.go b/metrics/util/aggregate/pane.go
new file mode 100644
index 0000000..a7b0fba
--- /dev/null
+++ b/metrics/util/aggregate/pane.go
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aggregate
+
+// pane represents a window over a period of time.
+// It uses interface{} to store any type of value.
+type pane struct {
+	startInMs    int64
+	endInMs      int64
+	intervalInMs int64
+	value        interface{}
+}
+
+func newPane(intervalInMs, startInMs int64, value interface{}) *pane {
+	return &pane{
+		startInMs:    startInMs,
+		endInMs:      startInMs + intervalInMs,
+		intervalInMs: intervalInMs,
+		value:        value,
+	}
+}
+
+func (p *pane) resetTo(startInMs int64, value interface{}) {
+	p.startInMs = startInMs
+	p.endInMs = startInMs + p.intervalInMs
+	p.value = value
+}
diff --git a/metrics/util/aggregate/quantile.go b/metrics/util/aggregate/quantile.go
new file mode 100644
index 0000000..1a78ba5
--- /dev/null
+++ b/metrics/util/aggregate/quantile.go
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aggregate
+
+import (
+	"sync"
+	"time"
+)
+
+import (
+	"github.com/influxdata/tdigest"
+)
+
+// TimeWindowQuantile wrappers sliding window around T-Digest.
+//
+// It uses T-Digest algorithm to calculate quantile.
+// The window is divided into several panes, and each pane's value is a TDigest instance.
+type TimeWindowQuantile struct {
+	compression float64
+	window      *slidingWindow
+	mux         sync.RWMutex
+}
+
+func NewTimeWindowQuantile(compression float64, paneCount int, timeWindowSeconds int64) *TimeWindowQuantile {
+	return &TimeWindowQuantile{
+		compression: compression,
+		window:      newSlidingWindow(paneCount, timeWindowSeconds*1000),
+	}
+}
+
+// Quantile returns a quantile of the sliding window by merging all panes.
+func (t *TimeWindowQuantile) Quantile(q float64) float64 {
+	return t.mergeTDigests().Quantile(q)
+}
+
+// Quantiles returns quantiles of the sliding window by merging all panes.
+func (t *TimeWindowQuantile) Quantiles(qs []float64) []float64 {
+	td := t.mergeTDigests()
+
+	res := make([]float64, len(qs))
+	for i, q := range qs {
+		res[i] = td.Quantile(q)
+	}
+
+	return res
+}
+
+// mergeTDigests merges all panes' TDigests into one TDigest.
+func (t *TimeWindowQuantile) mergeTDigests() *tdigest.TDigest {
+	t.mux.RLock()
+	defer t.mux.RUnlock()
+
+	td := tdigest.NewWithCompression(t.compression)
+	for _, v := range t.window.values(time.Now().UnixMilli()) {
+		td.AddCentroidList(v.(*tdigest.TDigest).Centroids())
+	}
+	return td
+}
+
+// Add adds a value to the sliding window's current pane.
+func (t *TimeWindowQuantile) Add(value float64) {
+	t.mux.Lock()
+	defer t.mux.Unlock()
+
+	t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue).value.(*tdigest.TDigest).Add(value, 1)
+}
+
+func (t *TimeWindowQuantile) newEmptyValue() interface{} {
+	return tdigest.NewWithCompression(t.compression)
+}
diff --git a/metrics/util/aggregate/quantile_test.go b/metrics/util/aggregate/quantile_test.go
new file mode 100644
index 0000000..5e82431
--- /dev/null
+++ b/metrics/util/aggregate/quantile_test.go
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aggregate
+
+import "testing"
+
+func TestAddAndQuantile(t1 *testing.T) {
+	timeWindowQuantile := NewTimeWindowQuantile(100, 10, 1)
+	for i := 1; i <= 100; i++ {
+		timeWindowQuantile.Add(float64(i))
+	}
+
+	type args struct {
+		q float64
+	}
+
+	tests := []struct {
+		name string
+		args args
+		want float64
+	}{
+		{
+			name: "Quantile: 0.01",
+			args: args{
+				q: 0.01,
+			},
+			want: 1.5,
+		},
+		{
+			name: "Quantile: 0.99",
+			args: args{
+				q: 0.99,
+			},
+			want: 99.5,
+		},
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			t := timeWindowQuantile
+			if got := t.Quantile(tt.args.q); got != tt.want {
+				t1.Errorf("Quantile() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
diff --git a/metrics/util/aggregate/sliding_window.go b/metrics/util/aggregate/sliding_window.go
new file mode 100644
index 0000000..feda921
--- /dev/null
+++ b/metrics/util/aggregate/sliding_window.go
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aggregate
+
+// SlidingWindow adopts sliding window algorithm for statistics.
+//
+// It is not thread-safe.
+// A window contains paneCount panes.
+// intervalInMs = paneCount * paneIntervalInMs.
+type slidingWindow struct {
+	paneCount        int
+	intervalInMs     int64
+	paneIntervalInMs int64
+	paneSlice        []*pane
+}
+
+func newSlidingWindow(paneCount int, intervalInMs int64) *slidingWindow {
+	return &slidingWindow{
+		paneCount:        paneCount,
+		intervalInMs:     intervalInMs,
+		paneIntervalInMs: intervalInMs / int64(paneCount),
+		paneSlice:        make([]*pane, paneCount),
+	}
+}
+
+// values get all values from the slidingWindow's paneSlice.
+func (s *slidingWindow) values(timeMillis int64) []interface{} {
+	if timeMillis < 0 {
+		return make([]interface{}, 0)
+	}
+
+	res := make([]interface{}, 0, s.paneCount)
+
+	for _, p := range s.paneSlice {
+		if p == nil || s.isPaneDeprecated(p, timeMillis) {
+			continue
+		}
+		res = append(res, p.value)
+	}
+
+	return res
+}
+
+// isPaneDeprecated checks if the specified pane is deprecated at the specified timeMillis
+func (s *slidingWindow) isPaneDeprecated(pane *pane, timeMillis int64) bool {
+	return timeMillis-pane.startInMs > s.intervalInMs
+}
+
+// currentPane get the pane at the specified timestamp in milliseconds.
+func (s *slidingWindow) currentPane(timeMillis int64, newEmptyValue func() interface{}) *pane {
+	if timeMillis < 0 {
+		return nil
+	}
+	paneIdx := s.calcPaneIdx(timeMillis)
+	paneStart := s.calcPaneStart(timeMillis)
+
+	if s.paneSlice[paneIdx] == nil {
+		p := newPane(s.paneIntervalInMs, paneStart, newEmptyValue())
+		s.paneSlice[paneIdx] = p
+		return p
+	} else {
+		p := s.paneSlice[paneIdx]
+		if paneStart == p.startInMs {
+			return p
+		} else if paneStart > p.startInMs {
+			// The pane has deprecated. To avoid the overhead of creating a new instance, reset the original pane directly.
+			p.resetTo(paneStart, newEmptyValue())
+			return p
+		} else {
+			// The specified timestamp has passed.
+			return newPane(s.paneIntervalInMs, paneStart, newEmptyValue())
+		}
+	}
+}
+
+func (s *slidingWindow) calcPaneIdx(timeMillis int64) int {
+	return int(timeMillis/s.paneIntervalInMs) % s.paneCount
+}
+
+func (s *slidingWindow) calcPaneStart(timeMillis int64) int64 {
+	return timeMillis - timeMillis%s.paneIntervalInMs
+}