Merge branch 'main' into feature-metrics
diff --git a/metrics/prometheus/constant.go b/metrics/prometheus/constant.go
index 275fe25..41904db 100644
--- a/metrics/prometheus/constant.go
+++ b/metrics/prometheus/constant.go
@@ -36,6 +36,7 @@
providerField = "provider"
consumerField = "consumer"
+ qpsField = "qps"
requestsField = "requests"
rtField = "rt"
@@ -48,6 +49,7 @@
lastField = "last"
totalField = "total"
+ aggregateField = "aggregate"
processingField = "processing"
succeedField = "succeed"
)
diff --git a/metrics/prometheus/metric_set.go b/metrics/prometheus/metric_set.go
index 6c661c7..5d3caf2 100644
--- a/metrics/prometheus/metric_set.go
+++ b/metrics/prometheus/metric_set.go
@@ -43,15 +43,19 @@
}
type rpcCommonMetrics struct {
- requestsTotal *prometheus.CounterVec
- requestsProcessingTotal *prometheus.GaugeVec
- requestsSucceedTotal *prometheus.CounterVec
- rtMillisecondsMin *GaugeVecWithSyncMap
- rtMillisecondsMax *GaugeVecWithSyncMap
- rtMillisecondsSum *prometheus.CounterVec
- rtMillisecondsAvg *GaugeVecWithSyncMap
- rtMillisecondsLast *prometheus.GaugeVec
- rtMillisecondsQuantiles *quantileGaugeVec
+ qpsTotal *qpsGaugeVec
+ requestsTotal *prometheus.CounterVec
+ requestsTotalAggregate *aggregateCounterGaugeVec
+ requestsProcessingTotal *prometheus.GaugeVec
+ requestsSucceedTotal *prometheus.CounterVec
+ requestsSucceedTotalAggregate *aggregateCounterGaugeVec
+ rtMillisecondsMin *GaugeVecWithSyncMap
+ rtMillisecondsMax *GaugeVecWithSyncMap
+ rtMillisecondsSum *prometheus.CounterVec
+ rtMillisecondsAvg *GaugeVecWithSyncMap
+ rtMillisecondsLast *prometheus.GaugeVec
+ rtMillisecondsQuantiles *quantileGaugeVec
+ rtMillisecondsAggregate *aggregateFunctionsGaugeVec
}
type providerMetrics struct {
@@ -59,15 +63,25 @@
}
func (pm *providerMetrics) init(reporterConfig *metrics.ReporterConfig) {
+ pm.qpsTotal = newQpsGaugeVec(buildMetricsName(providerField, qpsField, totalField), reporterConfig.Namespace, labelNames)
pm.requestsTotal = newAutoCounterVec(buildMetricsName(providerField, requestsField, totalField), reporterConfig.Namespace, labelNames)
+ pm.requestsTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(providerField, requestsField, totalField, aggregateField), reporterConfig.Namespace, labelNames)
pm.requestsProcessingTotal = newAutoGaugeVec(buildMetricsName(providerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames)
pm.requestsSucceedTotal = newAutoCounterVec(buildMetricsName(providerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames)
+ pm.requestsSucceedTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(providerField, requestsField, succeedField, totalField, aggregateField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsMin = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, minField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsMax = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, maxField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(providerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(providerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(providerField, quantiles), reporterConfig.Namespace, labelNames, quantiles)
+ pm.rtMillisecondsAggregate = newAggregateFunctionsGaugeVec(
+ buildMetricsName(providerField, rtField, minField, milliSecondsField, aggregateField),
+ buildMetricsName(providerField, rtField, maxField, milliSecondsField, aggregateField),
+ buildMetricsName(providerField, rtField, avgField, milliSecondsField, aggregateField),
+ reporterConfig.Namespace,
+ labelNames,
+ )
}
type consumerMetrics struct {
@@ -75,15 +89,25 @@
}
func (cm *consumerMetrics) init(reporterConfig *metrics.ReporterConfig) {
+ cm.qpsTotal = newQpsGaugeVec(buildMetricsName(consumerField, qpsField, totalField), reporterConfig.Namespace, labelNames)
cm.requestsTotal = newAutoCounterVec(buildMetricsName(consumerField, requestsField, totalField), reporterConfig.Namespace, labelNames)
+ cm.requestsTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(consumerField, requestsField, totalField, aggregateField), reporterConfig.Namespace, labelNames)
cm.requestsProcessingTotal = newAutoGaugeVec(buildMetricsName(consumerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames)
cm.requestsSucceedTotal = newAutoCounterVec(buildMetricsName(consumerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames)
+ cm.requestsSucceedTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(consumerField, requestsField, succeedField, totalField, aggregateField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsMin = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, minField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsMax = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, maxField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(consumerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(consumerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(consumerField, quantiles), reporterConfig.Namespace, labelNames, quantiles)
+ cm.rtMillisecondsAggregate = newAggregateFunctionsGaugeVec(
+ buildMetricsName(consumerField, rtField, minField, milliSecondsField, aggregateField),
+ buildMetricsName(consumerField, rtField, maxField, milliSecondsField, aggregateField),
+ buildMetricsName(consumerField, rtField, avgField, milliSecondsField, aggregateField),
+ reporterConfig.Namespace,
+ labelNames,
+ )
}
// buildMetricsName builds metrics name split by "_".
diff --git a/metrics/prometheus/model.go b/metrics/prometheus/model.go
index 7ee94f2..3efd214 100644
--- a/metrics/prometheus/model.go
+++ b/metrics/prometheus/model.go
@@ -298,3 +298,92 @@
updateFunc(cur)
}
}
+
+type qpsGaugeVec struct {
+ gaugeVec *prometheus.GaugeVec
+ syncMap *sync.Map // key: labels string, value: TimeWindowCounter
+}
+
+func newQpsGaugeVec(name, namespace string, labels []string) *qpsGaugeVec {
+ return &qpsGaugeVec{
+ gaugeVec: newAutoGaugeVec(name, namespace, labels),
+ syncMap: &sync.Map{},
+ }
+}
+
+func (gv *qpsGaugeVec) updateQps(labels *prometheus.Labels) {
+ key := convertLabelsToMapKey(*labels)
+ cur := aggregate.NewTimeWindowCounter(10, 120)
+ cur.Inc()
+
+ if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded {
+ store := actual.(*aggregate.TimeWindowCounter)
+ store.Inc()
+ gv.gaugeVec.With(*labels).Set(store.Count() / float64(store.LivedSeconds()))
+ } else {
+ gv.gaugeVec.With(*labels).Set(cur.Count() / float64(cur.LivedSeconds()))
+ }
+}
+
+type aggregateCounterGaugeVec struct {
+ gaugeVec *prometheus.GaugeVec
+ syncMap *sync.Map // key: labels string, value: TimeWindowCounter
+}
+
+func newAggregateCounterGaugeVec(name, namespace string, labels []string) *aggregateCounterGaugeVec {
+ return &aggregateCounterGaugeVec{
+ gaugeVec: newAutoGaugeVec(name, namespace, labels),
+ syncMap: &sync.Map{},
+ }
+}
+
+func (gv *aggregateCounterGaugeVec) inc(labels *prometheus.Labels) {
+ key := convertLabelsToMapKey(*labels)
+ cur := aggregate.NewTimeWindowCounter(10, 120)
+ cur.Inc()
+
+ if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded {
+ store := actual.(*aggregate.TimeWindowCounter)
+ store.Inc()
+ gv.gaugeVec.With(*labels).Set(store.Count())
+ } else {
+ gv.gaugeVec.With(*labels).Set(cur.Count())
+ }
+}
+
+type aggregateFunctionsGaugeVec struct {
+ min *prometheus.GaugeVec
+ max *prometheus.GaugeVec
+ avg *prometheus.GaugeVec
+ syncMap *sync.Map // key: labels string, value: TimeWindowAggregator
+}
+
+func newAggregateFunctionsGaugeVec(minName, maxName, avgName, namespace string, labels []string) *aggregateFunctionsGaugeVec {
+ return &aggregateFunctionsGaugeVec{
+ min: newAutoGaugeVec(minName, namespace, labels),
+ max: newAutoGaugeVec(maxName, namespace, labels),
+ avg: newAutoGaugeVec(avgName, namespace, labels),
+ syncMap: &sync.Map{},
+ }
+}
+
+func (gv *aggregateFunctionsGaugeVec) update(labels *prometheus.Labels, curValue int64) {
+ key := convertLabelsToMapKey(*labels)
+ cur := aggregate.NewTimeWindowAggregator(10, 120)
+ cur.Add(float64(curValue))
+
+ updateFunc := func(aggregator *aggregate.TimeWindowAggregator) {
+ result := aggregator.Result()
+ gv.min.With(*labels).Set(result.Min)
+ gv.max.With(*labels).Set(result.Max)
+ gv.avg.With(*labels).Set(result.Avg)
+ }
+
+ if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded {
+ store := actual.(*aggregate.TimeWindowAggregator)
+ store.Add(float64(curValue))
+ updateFunc(store)
+ } else {
+ updateFunc(cur)
+ }
+}
diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go
index c123df3..715fd97 100644
--- a/metrics/prometheus/reporter.go
+++ b/metrics/prometheus/reporter.go
@@ -116,7 +116,7 @@
return
}
labels := buildLabels(url)
-
+ reporter.incQpsTotal(role, &labels)
reporter.incRequestsProcessingTotal(role, &labels)
}
@@ -142,12 +142,23 @@
}
}
+func (reporter *PrometheusReporter) incQpsTotal(role string, labels *prometheus.Labels) {
+ switch role {
+ case providerField:
+ reporter.provider.qpsTotal.updateQps(labels)
+ case consumerField:
+ reporter.consumer.qpsTotal.updateQps(labels)
+ }
+}
+
func (reporter *PrometheusReporter) incRequestsTotal(role string, labels *prometheus.Labels) {
switch role {
case providerField:
reporter.provider.requestsTotal.With(*labels).Inc()
+ reporter.provider.requestsTotalAggregate.inc(labels)
case consumerField:
reporter.consumer.requestsTotal.With(*labels).Inc()
+ reporter.consumer.requestsTotalAggregate.inc(labels)
}
}
@@ -173,8 +184,10 @@
switch role {
case providerField:
reporter.provider.requestsSucceedTotal.With(*labels).Inc()
+ reporter.provider.requestsSucceedTotalAggregate.inc(labels)
case consumerField:
reporter.consumer.requestsSucceedTotal.With(*labels).Inc()
+ reporter.consumer.requestsSucceedTotalAggregate.inc(labels)
}
}
@@ -187,6 +200,7 @@
go reporter.provider.rtMillisecondsMax.updateMax(labels, costMs)
go reporter.provider.rtMillisecondsAvg.updateAvg(labels, costMs)
go reporter.provider.rtMillisecondsQuantiles.updateQuantile(labels, costMs)
+ go reporter.provider.rtMillisecondsAggregate.update(labels, costMs)
case consumerField:
go reporter.consumer.rtMillisecondsLast.With(*labels).Set(float64(costMs))
go reporter.consumer.rtMillisecondsSum.With(*labels).Add(float64(costMs))
@@ -194,5 +208,6 @@
go reporter.consumer.rtMillisecondsMax.updateMax(labels, costMs)
go reporter.consumer.rtMillisecondsAvg.updateAvg(labels, costMs)
go reporter.consumer.rtMillisecondsQuantiles.updateQuantile(labels, costMs)
+ go reporter.consumer.rtMillisecondsAggregate.update(labels, costMs)
}
}