feat: add some metrics (#2362)

diff --git a/metrics/prometheus/constant.go b/metrics/prometheus/constant.go
index 275fe25..41904db 100644
--- a/metrics/prometheus/constant.go
+++ b/metrics/prometheus/constant.go
@@ -36,6 +36,7 @@
 	providerField = "provider"
 	consumerField = "consumer"
 
+	qpsField      = "qps"
 	requestsField = "requests"
 	rtField       = "rt"
 
@@ -48,6 +49,7 @@
 	lastField = "last"
 
 	totalField      = "total"
+	aggregateField  = "aggregate"
 	processingField = "processing"
 	succeedField    = "succeed"
 )
diff --git a/metrics/prometheus/metric_set.go b/metrics/prometheus/metric_set.go
index 6c661c7..5d3caf2 100644
--- a/metrics/prometheus/metric_set.go
+++ b/metrics/prometheus/metric_set.go
@@ -43,15 +43,19 @@
 }
 
 type rpcCommonMetrics struct {
-	requestsTotal           *prometheus.CounterVec
-	requestsProcessingTotal *prometheus.GaugeVec
-	requestsSucceedTotal    *prometheus.CounterVec
-	rtMillisecondsMin       *GaugeVecWithSyncMap
-	rtMillisecondsMax       *GaugeVecWithSyncMap
-	rtMillisecondsSum       *prometheus.CounterVec
-	rtMillisecondsAvg       *GaugeVecWithSyncMap
-	rtMillisecondsLast      *prometheus.GaugeVec
-	rtMillisecondsQuantiles *quantileGaugeVec
+	qpsTotal                      *qpsGaugeVec
+	requestsTotal                 *prometheus.CounterVec
+	requestsTotalAggregate        *aggregateCounterGaugeVec
+	requestsProcessingTotal       *prometheus.GaugeVec
+	requestsSucceedTotal          *prometheus.CounterVec
+	requestsSucceedTotalAggregate *aggregateCounterGaugeVec
+	rtMillisecondsMin             *GaugeVecWithSyncMap
+	rtMillisecondsMax             *GaugeVecWithSyncMap
+	rtMillisecondsSum             *prometheus.CounterVec
+	rtMillisecondsAvg             *GaugeVecWithSyncMap
+	rtMillisecondsLast            *prometheus.GaugeVec
+	rtMillisecondsQuantiles       *quantileGaugeVec
+	rtMillisecondsAggregate       *aggregateFunctionsGaugeVec
 }
 
 type providerMetrics struct {
@@ -59,15 +63,25 @@
 }
 
 func (pm *providerMetrics) init(reporterConfig *metrics.ReporterConfig) {
+	pm.qpsTotal = newQpsGaugeVec(buildMetricsName(providerField, qpsField, totalField), reporterConfig.Namespace, labelNames)
 	pm.requestsTotal = newAutoCounterVec(buildMetricsName(providerField, requestsField, totalField), reporterConfig.Namespace, labelNames)
+	pm.requestsTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(providerField, requestsField, totalField, aggregateField), reporterConfig.Namespace, labelNames)
 	pm.requestsProcessingTotal = newAutoGaugeVec(buildMetricsName(providerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames)
 	pm.requestsSucceedTotal = newAutoCounterVec(buildMetricsName(providerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames)
+	pm.requestsSucceedTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(providerField, requestsField, succeedField, totalField, aggregateField), reporterConfig.Namespace, labelNames)
 	pm.rtMillisecondsMin = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, minField), reporterConfig.Namespace, labelNames)
 	pm.rtMillisecondsMax = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, maxField), reporterConfig.Namespace, labelNames)
 	pm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(providerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames)
 	pm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames)
 	pm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(providerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames)
 	pm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(providerField, quantiles), reporterConfig.Namespace, labelNames, quantiles)
+	pm.rtMillisecondsAggregate = newAggregateFunctionsGaugeVec(
+		buildMetricsName(providerField, rtField, minField, milliSecondsField, aggregateField),
+		buildMetricsName(providerField, rtField, maxField, milliSecondsField, aggregateField),
+		buildMetricsName(providerField, rtField, avgField, milliSecondsField, aggregateField),
+		reporterConfig.Namespace,
+		labelNames,
+	)
 }
 
 type consumerMetrics struct {
@@ -75,15 +89,25 @@
 }
 
 func (cm *consumerMetrics) init(reporterConfig *metrics.ReporterConfig) {
+	cm.qpsTotal = newQpsGaugeVec(buildMetricsName(consumerField, qpsField, totalField), reporterConfig.Namespace, labelNames)
 	cm.requestsTotal = newAutoCounterVec(buildMetricsName(consumerField, requestsField, totalField), reporterConfig.Namespace, labelNames)
+	cm.requestsTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(consumerField, requestsField, totalField, aggregateField), reporterConfig.Namespace, labelNames)
 	cm.requestsProcessingTotal = newAutoGaugeVec(buildMetricsName(consumerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames)
 	cm.requestsSucceedTotal = newAutoCounterVec(buildMetricsName(consumerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames)
+	cm.requestsSucceedTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(consumerField, requestsField, succeedField, totalField, aggregateField), reporterConfig.Namespace, labelNames)
 	cm.rtMillisecondsMin = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, minField), reporterConfig.Namespace, labelNames)
 	cm.rtMillisecondsMax = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, maxField), reporterConfig.Namespace, labelNames)
 	cm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(consumerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames)
 	cm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames)
 	cm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(consumerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames)
 	cm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(consumerField, quantiles), reporterConfig.Namespace, labelNames, quantiles)
+	cm.rtMillisecondsAggregate = newAggregateFunctionsGaugeVec(
+		buildMetricsName(consumerField, rtField, minField, milliSecondsField, aggregateField),
+		buildMetricsName(consumerField, rtField, maxField, milliSecondsField, aggregateField),
+		buildMetricsName(consumerField, rtField, avgField, milliSecondsField, aggregateField),
+		reporterConfig.Namespace,
+		labelNames,
+	)
 }
 
 // buildMetricsName builds metrics name split by "_".
diff --git a/metrics/prometheus/model.go b/metrics/prometheus/model.go
index 7ee94f2..3efd214 100644
--- a/metrics/prometheus/model.go
+++ b/metrics/prometheus/model.go
@@ -298,3 +298,92 @@
 		updateFunc(cur)
 	}
 }
+
+type qpsGaugeVec struct {
+	gaugeVec *prometheus.GaugeVec
+	syncMap  *sync.Map // key: labels string, value: TimeWindowCounter
+}
+
+func newQpsGaugeVec(name, namespace string, labels []string) *qpsGaugeVec {
+	return &qpsGaugeVec{
+		gaugeVec: newAutoGaugeVec(name, namespace, labels),
+		syncMap:  &sync.Map{},
+	}
+}
+
+func (gv *qpsGaugeVec) updateQps(labels *prometheus.Labels) {
+	key := convertLabelsToMapKey(*labels)
+	cur := aggregate.NewTimeWindowCounter(10, 120)
+	cur.Inc()
+
+	if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded {
+		store := actual.(*aggregate.TimeWindowCounter)
+		store.Inc()
+		gv.gaugeVec.With(*labels).Set(store.Count() / float64(store.LivedSeconds()))
+	} else {
+		gv.gaugeVec.With(*labels).Set(cur.Count() / float64(cur.LivedSeconds()))
+	}
+}
+
+type aggregateCounterGaugeVec struct {
+	gaugeVec *prometheus.GaugeVec
+	syncMap  *sync.Map // key: labels string, value: TimeWindowCounter
+}
+
+func newAggregateCounterGaugeVec(name, namespace string, labels []string) *aggregateCounterGaugeVec {
+	return &aggregateCounterGaugeVec{
+		gaugeVec: newAutoGaugeVec(name, namespace, labels),
+		syncMap:  &sync.Map{},
+	}
+}
+
+func (gv *aggregateCounterGaugeVec) inc(labels *prometheus.Labels) {
+	key := convertLabelsToMapKey(*labels)
+	cur := aggregate.NewTimeWindowCounter(10, 120)
+	cur.Inc()
+
+	if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded {
+		store := actual.(*aggregate.TimeWindowCounter)
+		store.Inc()
+		gv.gaugeVec.With(*labels).Set(store.Count())
+	} else {
+		gv.gaugeVec.With(*labels).Set(cur.Count())
+	}
+}
+
+type aggregateFunctionsGaugeVec struct {
+	min     *prometheus.GaugeVec
+	max     *prometheus.GaugeVec
+	avg     *prometheus.GaugeVec
+	syncMap *sync.Map // key: labels string, value: TimeWindowAggregator
+}
+
+func newAggregateFunctionsGaugeVec(minName, maxName, avgName, namespace string, labels []string) *aggregateFunctionsGaugeVec {
+	return &aggregateFunctionsGaugeVec{
+		min:     newAutoGaugeVec(minName, namespace, labels),
+		max:     newAutoGaugeVec(maxName, namespace, labels),
+		avg:     newAutoGaugeVec(avgName, namespace, labels),
+		syncMap: &sync.Map{},
+	}
+}
+
+func (gv *aggregateFunctionsGaugeVec) update(labels *prometheus.Labels, curValue int64) {
+	key := convertLabelsToMapKey(*labels)
+	cur := aggregate.NewTimeWindowAggregator(10, 120)
+	cur.Add(float64(curValue))
+
+	updateFunc := func(aggregator *aggregate.TimeWindowAggregator) {
+		result := aggregator.Result()
+		gv.min.With(*labels).Set(result.Min)
+		gv.max.With(*labels).Set(result.Max)
+		gv.avg.With(*labels).Set(result.Avg)
+	}
+
+	if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded {
+		store := actual.(*aggregate.TimeWindowAggregator)
+		store.Add(float64(curValue))
+		updateFunc(store)
+	} else {
+		updateFunc(cur)
+	}
+}
diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go
index c123df3..715fd97 100644
--- a/metrics/prometheus/reporter.go
+++ b/metrics/prometheus/reporter.go
@@ -116,7 +116,7 @@
 		return
 	}
 	labels := buildLabels(url)
-
+	reporter.incQpsTotal(role, &labels)
 	reporter.incRequestsProcessingTotal(role, &labels)
 }
 
@@ -142,12 +142,23 @@
 	}
 }
 
+func (reporter *PrometheusReporter) incQpsTotal(role string, labels *prometheus.Labels) {
+	switch role {
+	case providerField:
+		reporter.provider.qpsTotal.updateQps(labels)
+	case consumerField:
+		reporter.consumer.qpsTotal.updateQps(labels)
+	}
+}
+
 func (reporter *PrometheusReporter) incRequestsTotal(role string, labels *prometheus.Labels) {
 	switch role {
 	case providerField:
 		reporter.provider.requestsTotal.With(*labels).Inc()
+		reporter.provider.requestsTotalAggregate.inc(labels)
 	case consumerField:
 		reporter.consumer.requestsTotal.With(*labels).Inc()
+		reporter.consumer.requestsTotalAggregate.inc(labels)
 	}
 }
 
@@ -173,8 +184,10 @@
 	switch role {
 	case providerField:
 		reporter.provider.requestsSucceedTotal.With(*labels).Inc()
+		reporter.provider.requestsSucceedTotalAggregate.inc(labels)
 	case consumerField:
 		reporter.consumer.requestsSucceedTotal.With(*labels).Inc()
+		reporter.consumer.requestsSucceedTotalAggregate.inc(labels)
 	}
 }
 
@@ -187,6 +200,7 @@
 		go reporter.provider.rtMillisecondsMax.updateMax(labels, costMs)
 		go reporter.provider.rtMillisecondsAvg.updateAvg(labels, costMs)
 		go reporter.provider.rtMillisecondsQuantiles.updateQuantile(labels, costMs)
+		go reporter.provider.rtMillisecondsAggregate.update(labels, costMs)
 	case consumerField:
 		go reporter.consumer.rtMillisecondsLast.With(*labels).Set(float64(costMs))
 		go reporter.consumer.rtMillisecondsSum.With(*labels).Add(float64(costMs))
@@ -194,5 +208,6 @@
 		go reporter.consumer.rtMillisecondsMax.updateMax(labels, costMs)
 		go reporter.consumer.rtMillisecondsAvg.updateAvg(labels, costMs)
 		go reporter.consumer.rtMillisecondsQuantiles.updateQuantile(labels, costMs)
+		go reporter.consumer.rtMillisecondsAggregate.update(labels, costMs)
 	}
 }