refactor: make rt metric more performance and easy to use (#2376)

diff --git a/common/host_util.go b/common/host_util.go
index 5c411ab..eac0ac1 100644
--- a/common/host_util.go
+++ b/common/host_util.go
@@ -31,8 +31,10 @@
 	"dubbo.apache.org/dubbo-go/v3/common/constant"
 )
 
-var localIp string
-var localHostname string
+var (
+	localIp       string
+	localHostname string
+)
 
 func GetLocalIp() string {
 	if len(localIp) != 0 {
diff --git a/common/host_util_test.go b/common/host_util_test.go
index b78c4cf..4febece 100644
--- a/common/host_util_test.go
+++ b/common/host_util_test.go
@@ -34,6 +34,10 @@
 	assert.NotNil(t, GetLocalIp())
 }
 
+func TestGetLocalHostName(t *testing.T) {
+	assert.NotNil(t, GetLocalHostName())
+}
+
 func TestHandleRegisterIPAndPort(t *testing.T) {
 	url := NewURLWithOptions(WithIp("1.2.3.4"), WithPort("20000"))
 	HandleRegisterIPAndPort(url)
diff --git a/metrics/api.go b/metrics/api.go
index fdc69c1..d274dd1 100644
--- a/metrics/api.go
+++ b/metrics/api.go
@@ -25,9 +25,11 @@
 	"dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
 )
 
-var registries = make(map[string]func(*ReporterConfig) MetricRegistry)
-var collectors = make([]CollectorFunc, 0)
-var registry MetricRegistry
+var (
+	registries = make(map[string]func(*ReporterConfig) MetricRegistry)
+	collectors = make([]CollectorFunc, 0)
+	registry   MetricRegistry
+)
 
 // CollectorFunc used to extend more indicators
 type CollectorFunc func(MetricRegistry, *ReporterConfig)
@@ -59,15 +61,22 @@
 
 // MetricRegistry data container,data compute、expose、agg
 type MetricRegistry interface {
-	Counter(*MetricId) CounterMetric     // add or update a counter
-	Gauge(*MetricId) GaugeMetric         // add or update a gauge
-	Histogram(*MetricId) HistogramMetric // add a metric num to a histogram
-	Summary(*MetricId) SummaryMetric     // add a metric num to a summary
-	Export()                             // expose metric data, such as Prometheus http exporter
+	Counter(*MetricId) CounterMetric        // add or update a counter
+	Gauge(*MetricId) GaugeMetric            // add or update a gauge
+	Histogram(*MetricId) ObservableMetric   // add a metric num to a histogram
+	Summary(*MetricId) ObservableMetric     // add a metric num to a summary
+	Rt(*MetricId, *RtOpts) ObservableMetric // add a metric num to a rt
+	Export()                                // expose metric data, such as Prometheus http exporter
 	// GetMetrics() []*MetricSample // get all metric data
 	// GetMetricsString() (string, error) // get text format metric data
 }
 
+type RtOpts struct {
+	Aggregate         bool
+	BucketNum         int   // only for aggRt
+	TimeWindowSeconds int64 // only for aggRt
+}
+
 // multi registry,like micrometer CompositeMeterRegistry
 // type CompositeRegistry struct {
 // 	rs []MetricRegistry
@@ -131,14 +140,9 @@
 	// Sub(float64)
 }
 
-// HistogramMetric histogram metric
-type HistogramMetric interface {
-	Record(float64)
-}
-
-// SummaryMetric summary metric
-type SummaryMetric interface {
-	Record(float64)
+// histogram summary rt metric
+type ObservableMetric interface {
+	Observe(float64)
 }
 
 // StatesMetrics multi metrics,include total,success num, fail num,call MetricsRegistry save data
@@ -232,8 +236,13 @@
 	} else {
 		metricsCacheMutex.Lock()
 		defer metricsCacheMutex.Unlock()
-		n := supplier()
-		metricsCache[key] = n
-		return n
+		v, ok = metricsCache[key] // double check,avoid overwriting
+		if ok {
+			return v
+		} else {
+			n := supplier()
+			metricsCache[key] = n
+			return n
+		}
 	}
 }
diff --git a/metrics/prometheus/registry.go b/metrics/prometheus/registry.go
index 70946d6..21ebc0d 100644
--- a/metrics/prometheus/registry.go
+++ b/metrics/prometheus/registry.go
@@ -24,7 +24,6 @@
 
 import (
 	prom "github.com/prometheus/client_golang/prometheus"
-	"github.com/prometheus/client_golang/prometheus/promauto"
 
 	"github.com/prometheus/common/expfmt"
 )
@@ -35,97 +34,96 @@
 
 func init() {
 	metrics.SetRegistry("prometheus", func(rc *metrics.ReporterConfig) metrics.MetricRegistry {
-		return &promMetricRegistry{
-			cvm: make(map[string]*prom.CounterVec),
-			gvm: make(map[string]*prom.GaugeVec),
-			hvm: make(map[string]*prom.HistogramVec),
-			svm: make(map[string]*prom.SummaryVec),
-		}
+		return &promMetricRegistry{r: prom.DefaultRegisterer}
 	})
 }
 
 type promMetricRegistry struct {
-	mtx sync.RWMutex                  // Protects metrics.
-	cvm map[string]*prom.CounterVec   // prom.CounterVec
-	gvm map[string]*prom.GaugeVec     // prom.GaugeVec
-	hvm map[string]*prom.HistogramVec // prom.HistogramVec
-	svm map[string]*prom.SummaryVec   // prom.SummaryVec
+	r    prom.Registerer // for convenience of testing
+	vecs sync.Map
+}
+
+func (p *promMetricRegistry) getOrComputeVec(key string, supplier func() interface{}) interface{} {
+	v, ok := p.vecs.Load(key)
+	if !ok {
+		v, ok = p.vecs.LoadOrStore(key, supplier())
+		if !ok {
+			p.r.MustRegister(v.(prom.Collector)) // only registe collector which stored success
+		}
+	}
+	return v
 }
 
 func (p *promMetricRegistry) Counter(m *metrics.MetricId) metrics.CounterMetric {
-	p.mtx.RLock()
-	vec, ok := p.cvm[m.Name]
-	p.mtx.RUnlock()
-	if !ok {
-		p.mtx.Lock()
-		vec = promauto.NewCounterVec(prom.CounterOpts{
+	vec := p.getOrComputeVec(m.Name, func() interface{} {
+		return prom.NewCounterVec(prom.CounterOpts{
 			Name: m.Name,
 			Help: m.Desc,
 		}, m.TagKeys())
-		p.cvm[m.Name] = vec
-		p.mtx.Unlock()
-	}
-	c := vec.With(m.Tags)
-	return &counter{pc: c}
+	}).(*prom.CounterVec)
+	return vec.With(m.Tags)
 }
 
 func (p *promMetricRegistry) Gauge(m *metrics.MetricId) metrics.GaugeMetric {
-	p.mtx.RLock()
-	vec, ok := p.gvm[m.Name]
-	p.mtx.RUnlock()
-	if !ok {
-		p.mtx.Lock()
-		vec = promauto.NewGaugeVec(prom.GaugeOpts{
+	vec := p.getOrComputeVec(m.Name, func() interface{} {
+		return prom.NewGaugeVec(prom.GaugeOpts{
 			Name: m.Name,
 			Help: m.Desc,
 		}, m.TagKeys())
-		p.gvm[m.Name] = vec
-		p.mtx.Unlock()
-	}
-	g := vec.With(m.Tags)
-	return &gauge{pg: g}
+	}).(*prom.GaugeVec)
+	return vec.With(m.Tags)
 }
 
-func (p *promMetricRegistry) Histogram(m *metrics.MetricId) metrics.HistogramMetric {
-	p.mtx.RLock()
-	vec, ok := p.hvm[m.Name]
-	p.mtx.RUnlock()
-	if !ok {
-		p.mtx.Lock()
-		vec = promauto.NewHistogramVec(prom.HistogramOpts{
+func (p *promMetricRegistry) Histogram(m *metrics.MetricId) metrics.ObservableMetric {
+	vec := p.getOrComputeVec(m.Name, func() interface{} {
+		return prom.NewHistogramVec(prom.HistogramOpts{
 			Name: m.Name,
 			Help: m.Desc,
 		}, m.TagKeys())
-		p.hvm[m.Name] = vec
-		p.mtx.Unlock()
-	}
-	h := vec.With(m.Tags)
-	return &histogram{ph: h.(prom.Histogram)}
+	}).(*prom.HistogramVec)
+	return vec.With(m.Tags)
 }
 
-func (p *promMetricRegistry) Summary(m *metrics.MetricId) metrics.SummaryMetric {
-	p.mtx.RLock()
-	vec, ok := p.svm[m.Name]
-	p.mtx.RUnlock()
-	if !ok {
-		p.mtx.Lock()
-		vec = promauto.NewSummaryVec(prom.SummaryOpts{
+func (p *promMetricRegistry) Summary(m *metrics.MetricId) metrics.ObservableMetric {
+	vec := p.getOrComputeVec(m.Name, func() interface{} {
+		return prom.NewSummaryVec(prom.SummaryOpts{
 			Name: m.Name,
 			Help: m.Desc,
 		}, m.TagKeys())
-		p.svm[m.Name] = vec
-		p.mtx.Unlock()
+	}).(*prom.SummaryVec)
+	return vec.With(m.Tags)
+}
+
+func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts *metrics.RtOpts) metrics.ObservableMetric {
+	var supplier func() interface{}
+	if opts != nil && opts.Aggregate {
+		supplier = func() interface{} {
+			// TODO set default aggregate config from config
+			return NewAggRtVec(&RtOpts{
+				Name:              m.Name,
+				Help:              m.Desc,
+				bucketNum:         opts.BucketNum,
+				timeWindowSeconds: opts.TimeWindowSeconds,
+			}, m.TagKeys())
+		}
+	} else {
+		supplier = func() interface{} {
+			return NewRtVec(&RtOpts{
+				Name: m.Name,
+				Help: m.Desc,
+			}, m.TagKeys())
+		}
 	}
-	s := vec.With(m.Tags)
-	return &summary{ps: s.(prom.Summary)}
+	vec := p.getOrComputeVec(m.Name, supplier).(*RtVec)
+	return vec.With(m.Tags)
 }
 
 func (p *promMetricRegistry) Export() {
-
+	// use promauto export global, TODO move here
 }
 
 func (p *promMetricRegistry) Scrape() (string, error) {
-	r := prom.DefaultRegisterer.(*prom.Registry)
+	r := p.r.(prom.Gatherer)
 	gathering, err := r.Gather()
 	if err != nil {
 		return "", err
@@ -138,52 +136,3 @@
 	}
 	return out.String(), nil
 }
-
-type counter struct {
-	pc prom.Counter
-}
-
-func (c *counter) Inc() {
-	c.pc.Inc()
-}
-func (c *counter) Add(v float64) {
-	c.pc.Add(v)
-}
-
-type gauge struct {
-	pg prom.Gauge
-}
-
-//	func (g *gauge) Inc() {
-//		g.pg.Inc()
-//	}
-//
-//	func (g *gauge) Dec() {
-//		g.pg.Dec()
-//	}
-func (g *gauge) Set(v float64) {
-	g.pg.Set(v)
-}
-
-// func (g *gauge) Add(v float64) {
-// 	g.pg.Add(v)
-// }
-// func (g *gauge) Sub(v float64) {
-// 	g.pg.Sub(v)
-// }
-
-type histogram struct {
-	ph prom.Histogram
-}
-
-func (h *histogram) Record(v float64) {
-	h.ph.Observe(v)
-}
-
-type summary struct {
-	ps prom.Summary
-}
-
-func (s *summary) Record(v float64) {
-	s.ps.Observe(v)
-}
diff --git a/metrics/prometheus/registry_test.go b/metrics/prometheus/registry_test.go
new file mode 100644
index 0000000..7e7ac2a
--- /dev/null
+++ b/metrics/prometheus/registry_test.go
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package prometheus
+
+import (
+	"sync"
+	"testing"
+)
+
+import (
+	prom "github.com/prometheus/client_golang/prometheus"
+
+	"github.com/stretchr/testify/assert"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/metrics"
+)
+
+var (
+	tags     = map[string]string{"app": "dubbo", "version": "1.0.0"}
+	metricId = &metrics.MetricId{Name: "dubbo_request", Desc: "request", Tags: tags}
+)
+
+func TestPromMetricRegistryCounter(t *testing.T) {
+	p := &promMetricRegistry{r: prom.NewRegistry()}
+	p.Counter(metricId).Inc()
+	text, err := p.Scrape()
+	assert.Nil(t, err)
+	assert.Contains(t, text, "# HELP dubbo_request request\n# TYPE dubbo_request counter")
+	assert.Contains(t, text, `dubbo_request{app="dubbo",version="1.0.0"} 1`)
+}
+
+func TestPromMetricRegistryGauge(t *testing.T) {
+	p := &promMetricRegistry{r: prom.NewRegistry()}
+	p.Gauge(metricId).Set(100)
+	text, err := p.Scrape()
+	assert.Nil(t, err)
+	assert.Contains(t, text, "# HELP dubbo_request request\n# TYPE dubbo_request gauge")
+	assert.Contains(t, text, `dubbo_request{app="dubbo",version="1.0.0"} 100`)
+
+}
+
+func TestPromMetricRegistryHistogram(t *testing.T) {
+	p := &promMetricRegistry{r: prom.NewRegistry()}
+	p.Histogram(metricId).Observe(100)
+	text, err := p.Scrape()
+	assert.Nil(t, err)
+	assert.Contains(t, text, "# HELP dubbo_request request\n# TYPE dubbo_request histogram")
+	assert.Contains(t, text, `dubbo_request_bucket{app="dubbo",version="1.0.0",le="+Inf"} 1`)
+	assert.Contains(t, text, `dubbo_request_sum{app="dubbo",version="1.0.0"} 100`)
+	assert.Contains(t, text, `dubbo_request_count{app="dubbo",version="1.0.0"} 1`)
+}
+
+func TestPromMetricRegistrySummary(t *testing.T) {
+	p := &promMetricRegistry{r: prom.NewRegistry()}
+	p.Summary(metricId).Observe(100)
+	text, err := p.Scrape()
+	assert.Nil(t, err)
+	assert.Contains(t, text, "# HELP dubbo_request request\n# TYPE dubbo_request summary")
+	assert.Contains(t, text, "dubbo_request_sum{app=\"dubbo\",version=\"1.0.0\"} 100")
+	assert.Contains(t, text, "dubbo_request_count{app=\"dubbo\",version=\"1.0.0\"} 1")
+}
+
+func TestPromMetricRegistryRt(t *testing.T) {
+	p := &promMetricRegistry{r: prom.NewRegistry()}
+	for i := 0; i < 10; i++ {
+		p.Rt(metricId, &metrics.RtOpts{}).Observe(10 * float64(i))
+	}
+	text, err := p.Scrape()
+	assert.Nil(t, err)
+	assert.Contains(t, text, "# HELP dubbo_request_avg Average request\n# TYPE dubbo_request_avg gauge\ndubbo_request_avg{app=\"dubbo\",version=\"1.0.0\"} 45")
+	assert.Contains(t, text, "# HELP dubbo_request_last Last request\n# TYPE dubbo_request_last gauge\ndubbo_request_last{app=\"dubbo\",version=\"1.0.0\"} 90")
+	assert.Contains(t, text, "# HELP dubbo_request_max Max request\n# TYPE dubbo_request_max gauge\ndubbo_request_max{app=\"dubbo\",version=\"1.0.0\"} 90")
+	assert.Contains(t, text, "# HELP dubbo_request_min Min request\n# TYPE dubbo_request_min gauge\ndubbo_request_min{app=\"dubbo\",version=\"1.0.0\"} 0")
+	assert.Contains(t, text, "# HELP dubbo_request_sum Sum request\n# TYPE dubbo_request_sum gauge\ndubbo_request_sum{app=\"dubbo\",version=\"1.0.0\"} 450")
+
+	p = &promMetricRegistry{r: prom.NewRegistry()}
+	for i := 0; i < 10; i++ {
+		p.Rt(metricId, &metrics.RtOpts{Aggregate: true, BucketNum: 10, TimeWindowSeconds: 60}).Observe(10 * float64(i))
+	}
+	text, err = p.Scrape()
+	assert.Nil(t, err)
+	assert.Contains(t, text, "# HELP dubbo_request_avg_milliseconds_aggregate The average request\n# TYPE dubbo_request_avg_milliseconds_aggregate gauge\ndubbo_request_avg_milliseconds_aggregate{app=\"dubbo\",version=\"1.0.0\"} 45")
+	assert.Contains(t, text, "# HELP dubbo_request_max_milliseconds_aggregate The maximum request\n# TYPE dubbo_request_max_milliseconds_aggregate gauge\ndubbo_request_max_milliseconds_aggregate{app=\"dubbo\",version=\"1.0.0\"} 90")
+	assert.Contains(t, text, "# HELP dubbo_request_min_milliseconds_aggregate The minimum request\n# TYPE dubbo_request_min_milliseconds_aggregate gauge\ndubbo_request_min_milliseconds_aggregate{app=\"dubbo\",version=\"1.0.0\"} 0")
+}
+
+func TestPromMetricRegistryCounterConcurrent(t *testing.T) {
+	p := &promMetricRegistry{r: prom.NewRegistry()}
+	var wg sync.WaitGroup
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		go func() {
+			p.Counter(metricId).Inc()
+			wg.Done()
+		}()
+	}
+	wg.Wait()
+	text, err := p.Scrape()
+	assert.Nil(t, err)
+	assert.Contains(t, text, "# HELP dubbo_request request\n# TYPE dubbo_request counter")
+	assert.Contains(t, text, `dubbo_request{app="dubbo",version="1.0.0"} 10`)
+}
diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go
index 715fd97..0e9fedc 100644
--- a/metrics/prometheus/reporter.go
+++ b/metrics/prometheus/reporter.go
@@ -26,6 +26,7 @@
 
 import (
 	"github.com/dubbogo/gost/log/logger"
+
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 )
diff --git a/metrics/prometheus/rt_vec.go b/metrics/prometheus/rt_vec.go
new file mode 100644
index 0000000..265374c
--- /dev/null
+++ b/metrics/prometheus/rt_vec.go
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package prometheus
+
+import (
+	"bytes"
+	"sync"
+)
+
+import (
+	prom "github.com/prometheus/client_golang/prometheus"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
+)
+
+type rtMetric struct {
+	nameSuffix string
+	helpPrefix string
+	valueFunc  func(*aggregate.Result) float64
+}
+
+func (m *rtMetric) desc(opts *RtOpts, labels []string) *prom.Desc {
+	return prom.NewDesc(prom.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name+m.nameSuffix), m.helpPrefix+opts.Help, labels, opts.ConstLabels)
+}
+
+var (
+	rtMetrics  = make([]*rtMetric, 5)
+	aggMetrics = make([]*rtMetric, 3)
+)
+
+func init() {
+	rtMetrics[0] = &rtMetric{nameSuffix: "_sum", helpPrefix: "Sum ", valueFunc: func(r *aggregate.Result) float64 { return r.Total }}
+	rtMetrics[1] = &rtMetric{nameSuffix: "_last", helpPrefix: "Last ", valueFunc: func(r *aggregate.Result) float64 { return r.Last }}
+	rtMetrics[2] = &rtMetric{nameSuffix: "_min", helpPrefix: "Min ", valueFunc: func(r *aggregate.Result) float64 { return r.Min }}
+	rtMetrics[3] = &rtMetric{nameSuffix: "_max", helpPrefix: "Max ", valueFunc: func(r *aggregate.Result) float64 { return r.Max }}
+	rtMetrics[4] = &rtMetric{nameSuffix: "_avg", helpPrefix: "Average ", valueFunc: func(r *aggregate.Result) float64 { return r.Avg }}
+
+	aggMetrics[0] = &rtMetric{nameSuffix: "_avg_milliseconds_aggregate", helpPrefix: "The average ", valueFunc: func(r *aggregate.Result) float64 { return r.Avg }}
+	aggMetrics[1] = &rtMetric{nameSuffix: "_min_milliseconds_aggregate", helpPrefix: "The minimum ", valueFunc: func(r *aggregate.Result) float64 { return r.Min }}
+	aggMetrics[2] = &rtMetric{nameSuffix: "_max_milliseconds_aggregate", helpPrefix: "The maximum ", valueFunc: func(r *aggregate.Result) float64 { return r.Max }}
+}
+
+type RtOpts struct {
+	Namespace         string
+	Subsystem         string
+	Name              string
+	Help              string
+	ConstLabels       prom.Labels
+	bucketNum         int   // only for aggRt
+	timeWindowSeconds int64 // only for aggRt
+}
+
+type observer interface {
+	Observe(val float64)
+	result() *aggregate.Result
+}
+
+type aggResult struct {
+	agg *aggregate.TimeWindowAggregator
+}
+
+func (r *aggResult) Observe(val float64) {
+	r.agg.Add(val)
+}
+
+func (r *aggResult) result() *aggregate.Result {
+	return r.agg.Result()
+}
+
+type valueResult struct {
+	mtx sync.RWMutex
+	val *aggregate.Result
+}
+
+func (r *valueResult) Observe(val float64) {
+	r.mtx.Lock()
+	defer r.mtx.Unlock()
+	r.val.Update(val)
+}
+
+func (r *valueResult) result() *aggregate.Result {
+	res := aggregate.NewResult()
+	r.mtx.RLock()
+	res.Merge(r.val)
+	r.mtx.RUnlock()
+	return res.Get()
+}
+
+type Rt struct {
+	tags map[string]string
+	obs  observer
+}
+
+func (r *Rt) Observe(val float64) {
+	r.obs.Observe(val)
+}
+
+func buildKey(m map[string]string, labNames []string) string {
+	var buffer bytes.Buffer
+	for _, label := range labNames {
+		if buffer.Len() != 0 {
+			buffer.WriteString("_")
+		}
+		buffer.WriteString(m[label])
+	}
+	return buffer.String()
+}
+
+func buildLabelValues(m map[string]string, labNames []string) []string {
+	values := make([]string, len(labNames))
+	for i, label := range labNames {
+		values[i] = m[label]
+	}
+	return values
+}
+
+type RtVec struct {
+	opts       *RtOpts
+	labelNames []string
+	aggMap     sync.Map
+	initFunc   func(tags map[string]string) *Rt
+	metrics    []*rtMetric
+}
+
+func NewRtVec(opts *RtOpts, labNames []string) *RtVec {
+	return &RtVec{
+		opts:       opts,
+		labelNames: labNames,
+		metrics:    rtMetrics,
+		initFunc: func(tags map[string]string) *Rt {
+			return &Rt{
+				tags: tags,
+				obs:  &valueResult{val: aggregate.NewResult()},
+			}
+		},
+	}
+}
+
+func NewAggRtVec(opts *RtOpts, labNames []string) *RtVec {
+	return &RtVec{
+		opts:       opts,
+		labelNames: labNames,
+		metrics:    aggMetrics,
+		initFunc: func(tags map[string]string) *Rt {
+			return &Rt{
+				tags: tags,
+				obs:  &aggResult{agg: aggregate.NewTimeWindowAggregator(opts.bucketNum, opts.timeWindowSeconds)},
+			}
+		},
+	}
+}
+
+func (r *RtVec) With(tags map[string]string) prom.Observer {
+	k := buildKey(tags, r.labelNames)
+	return r.computeIfAbsent(k, func() *Rt {
+		return r.initFunc(tags)
+	})
+}
+
+func (r *RtVec) computeIfAbsent(k string, supplier func() *Rt) *Rt {
+	v, ok := r.aggMap.Load(k)
+	if !ok {
+		v, _ = r.aggMap.LoadOrStore(k, supplier())
+	}
+	return v.(*Rt)
+}
+
+func (r *RtVec) Collect(ch chan<- prom.Metric) {
+	r.aggMap.Range(func(_, val interface{}) bool {
+		v := val.(*Rt)
+		res := v.obs.result()
+		for _, m := range r.metrics {
+			ch <- prom.MustNewConstMetric(m.desc(r.opts, r.labelNames), prom.GaugeValue, m.valueFunc(res), buildLabelValues(v.tags, r.labelNames)...)
+		}
+		return true
+	})
+}
+
+func (r *RtVec) Describe(ch chan<- *prom.Desc) {
+	for _, m := range r.metrics {
+		ch <- m.desc(r.opts, r.labelNames)
+	}
+}
diff --git a/metrics/prometheus/rt_vec_test.go b/metrics/prometheus/rt_vec_test.go
new file mode 100644
index 0000000..8964798
--- /dev/null
+++ b/metrics/prometheus/rt_vec_test.go
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package prometheus
+
+import (
+	"reflect"
+	"sync"
+	"testing"
+)
+
+import (
+	prom "github.com/prometheus/client_golang/prometheus"
+
+	"github.com/stretchr/testify/assert"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
+)
+
+func TestRtVecCollect(t *testing.T) {
+	opts := &RtOpts{
+		Name:              "request_num",
+		bucketNum:         10,
+		timeWindowSeconds: 120,
+		Help:              "Request cost",
+	}
+	labels := []string{"app", "version"}
+	vecs := [2]*RtVec{NewRtVec(opts, labels), NewAggRtVec(opts, labels)}
+	for _, r := range vecs {
+		r.With(map[string]string{"app": "dubbo", "version": "1.0.0"}).Observe(100)
+		ch := make(chan prom.Metric, len(r.metrics))
+		r.Collect(ch)
+		close(ch)
+		assert.Equal(t, len(ch), len(r.metrics))
+		for _, m := range r.metrics {
+			metric, ok := <-ch
+			if !ok {
+				t.Error("not enough metrics")
+			} else {
+				str := metric.Desc().String()
+				assert.Contains(t, str, m.nameSuffix)
+				assert.Contains(t, str, m.helpPrefix)
+				assert.Contains(t, str, "app")
+				assert.Contains(t, str, "version")
+			}
+		}
+	}
+}
+
+func TestRtVecDescribe(t *testing.T) {
+	opts := &RtOpts{
+		Name:              "request_num",
+		bucketNum:         10,
+		timeWindowSeconds: 120,
+		Help:              "Request cost",
+	}
+	labels := []string{"app", "version"}
+	vecs := [2]*RtVec{NewRtVec(opts, labels), NewAggRtVec(opts, labels)}
+	for _, r := range vecs {
+		ch := make(chan *prom.Desc, len(r.metrics))
+		r.Describe(ch)
+		close(ch)
+		assert.Equal(t, len(ch), len(r.metrics))
+		for _, m := range r.metrics {
+			desc, ok := <-ch
+			if !ok {
+				t.Error(t, "not enough desc")
+			} else {
+				str := desc.String()
+				assert.Contains(t, str, m.nameSuffix)
+				assert.Contains(t, str, m.helpPrefix)
+				assert.Contains(t, str, "app")
+				assert.Contains(t, str, "version")
+			}
+		}
+	}
+}
+
+func TestValueObserve(t *testing.T) {
+	rts := []*Rt{
+		{
+			tags: map[string]string{},
+			obs:  &aggResult{agg: aggregate.NewTimeWindowAggregator(10, 10)},
+		},
+		{
+			tags: map[string]string{},
+			obs:  &valueResult{val: aggregate.NewResult()},
+		},
+	}
+	want := &aggregate.Result{
+		Avg:   1,
+		Min:   1,
+		Max:   1,
+		Count: 1,
+		Total: 1,
+		Last:  1,
+	}
+	for i, v := range rts {
+		v.Observe(float64(1))
+		r := v.obs.result()
+		if i == 0 {
+			r.Last = 1 // agg result no Last, value is NaN
+		}
+		if !reflect.DeepEqual(r, want) {
+			t.Errorf("Result() = %v, want %v", r, want)
+		}
+	}
+}
+
+func TestRtVecWith(t *testing.T) {
+	opts := &RtOpts{
+		Name:              "request_num",
+		bucketNum:         10,
+		timeWindowSeconds: 120,
+		Help:              "Request cost",
+	}
+	labels := []string{"app", "version"}
+	vecs := [2]*RtVec{NewRtVec(opts, labels), NewAggRtVec(opts, labels)}
+	tags := map[string]string{"app": "dubbo", "version": "1.0.0"}
+	for _, r := range vecs {
+		first := r.With(tags)
+		second := r.With(tags)
+		assert.True(t, first == second) // init once
+	}
+}
+
+func TestRtVecWithConcurrent(t *testing.T) {
+	opts := &RtOpts{
+		Name:              "request_num",
+		bucketNum:         10,
+		timeWindowSeconds: 120,
+		Help:              "Request cost",
+	}
+	labels := []string{"app", "version"}
+	labelValues := map[string]string{"app": "dubbo", "version": "1.0.0"}
+	vecs := [2]*RtVec{NewRtVec(opts, labels), NewAggRtVec(opts, labels)}
+	for _, r := range vecs {
+		var wg sync.WaitGroup
+		for i := 0; i < 10; i++ {
+			wg.Add(1)
+			go func() {
+				r.With(labelValues).Observe(100)
+				wg.Done()
+			}()
+		}
+		wg.Wait()
+		res := r.With(labelValues).(*Rt).obs.result()
+		assert.True(t, res.Count == uint64(10))
+	}
+}
diff --git a/metrics/prometheus/util.go b/metrics/prometheus/util.go
index 4e1a9c8..29e5ebf 100644
--- a/metrics/prometheus/util.go
+++ b/metrics/prometheus/util.go
@@ -24,6 +24,7 @@
 
 import (
 	"github.com/dubbogo/gost/log/logger"
+
 	"github.com/prometheus/client_golang/prometheus"
 )
 
diff --git a/metrics/registry/collector.go b/metrics/registry/collector.go
index 7479fd2..63840ed 100644
--- a/metrics/registry/collector.go
+++ b/metrics/registry/collector.go
@@ -101,7 +101,7 @@
 	// Event is converted to metrics
 	// Save metrics to the MetricRegistry
 	rc.regRegistry.Counter(metrics.NewMetricId(NotifyMetricRequests, metrics.GetApplicationLevel())).Inc()
-	rc.regRegistry.Histogram(metrics.NewMetricId(NotifyMetricNumLast, metrics.GetApplicationLevel())).Record(float64(event.End.UnixNano()) / float64(time.Second))
+	rc.regRegistry.Histogram(metrics.NewMetricId(NotifyMetricNumLast, metrics.GetApplicationLevel())).Observe(float64(event.End.UnixNano()) / float64(time.Second))
 	metric := metrics.ComputeIfAbsentCache(dubboNotifyRt, func() interface{} {
 		return newTimeMetrics(NotifyRtMillisecondsMin, NotifyRtMillisecondsMax, NotifyRtMillisecondsAvg, NotifyRtMillisecondsSum, NotifyRtMillisecondsLast, metrics.GetApplicationLevel(), rc.regRegistry)
 	}).(metrics.TimeMetric)
diff --git a/metrics/util/aggregate/aggregator.go b/metrics/util/aggregate/aggregator.go
index 0f31f3f..a584cc6 100644
--- a/metrics/util/aggregate/aggregator.go
+++ b/metrics/util/aggregate/aggregator.go
@@ -45,36 +45,49 @@
 	Max   float64
 	Avg   float64
 	Count uint64
+	Last  float64
+}
+
+func NewResult() *Result {
+	return &Result{
+		Min:  math.MaxFloat64,
+		Max:  math.SmallestNonzeroFloat64,
+		Last: math.NaN(),
+	}
+}
+
+func (r *Result) Update(v float64) {
+	r.Min = math.Min(r.Min, v)
+	r.Max = math.Max(r.Max, v)
+	r.Last = v
+	r.Total += v
+	r.Count++
+}
+
+func (r *Result) Merge(o *Result) {
+	r.Min = math.Min(r.Min, o.Min)
+	r.Max = math.Max(r.Max, o.Max)
+	r.Total += o.Total
+	r.Count += o.Count
+	r.Last = o.Last
+}
+
+func (r *Result) Get() *Result {
+	if r.Count > 0 {
+		r.Avg = r.Total / float64(r.Count)
+	}
+	return r
 }
 
 // Result returns the aggregate result of the sliding window by aggregating all panes.
 func (t *TimeWindowAggregator) Result() *Result {
 	t.mux.RLock()
 	defer t.mux.RUnlock()
-
-	res := &Result{}
-
-	total := 0.0
-	count := uint64(0)
-	max := math.SmallestNonzeroFloat64
-	min := math.MaxFloat64
-
+	res := NewResult()
 	for _, v := range t.window.values(time.Now().UnixMilli()) {
-		total += v.(*aggregator).total
-		count += v.(*aggregator).count
-		max = math.Max(max, v.(*aggregator).max)
-		min = math.Min(min, v.(*aggregator).min)
+		res.Merge(v.(*Result)) // Last not as expect, but agg result has no Last value
 	}
-
-	if count > 0 {
-		res.Avg = total / float64(count)
-		res.Count = count
-		res.Total = total
-		res.Max = max
-		res.Min = min
-	}
-
-	return res
+	return res.Get()
 }
 
 // Add adds a value to the sliding window's current pane.
@@ -82,52 +95,9 @@
 	t.mux.Lock()
 	defer t.mux.Unlock()
 
-	t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue).value.(*aggregator).add(v)
+	t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue).value.(*Result).Update(v)
 }
 
 func (t *TimeWindowAggregator) newEmptyValue() interface{} {
-	return newAggregator()
-}
-
-// aggregator is a custom struct to aggregate data.
-//
-// It is NOT concurrent-safe.
-// It aggregates data by calculating the min, max, total and count.
-type aggregator struct {
-	min   float64
-	max   float64
-	total float64
-	count uint64
-}
-
-func newAggregator() *aggregator {
-	return &aggregator{
-		min:   math.MaxFloat64,
-		max:   math.SmallestNonzeroFloat64,
-		total: float64(0),
-		count: uint64(0),
-	}
-}
-
-func (a *aggregator) add(v float64) {
-	a.updateMin(v)
-	a.updateMax(v)
-	a.updateTotal(v)
-	a.updateCount()
-}
-
-func (a *aggregator) updateMin(v float64) {
-	a.min = math.Min(a.min, v)
-}
-
-func (a *aggregator) updateMax(v float64) {
-	a.max = math.Max(a.max, v)
-}
-
-func (a *aggregator) updateTotal(v float64) {
-	a.total += v
-}
-
-func (a *aggregator) updateCount() {
-	a.count++
+	return NewResult()
 }
diff --git a/metrics/util/aggregate/aggregator_test.go b/metrics/util/aggregate/aggregator_test.go
index d3fa4f6..ab661dc 100644
--- a/metrics/util/aggregate/aggregator_test.go
+++ b/metrics/util/aggregate/aggregator_test.go
@@ -48,7 +48,9 @@
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			if got := timeWindowAggregator.Result(); !reflect.DeepEqual(got, tt.want) {
+			got := timeWindowAggregator.Result()
+			got.Last = 0 // NaN can not equal
+			if !reflect.DeepEqual(got, tt.want) {
 				t.Errorf("Result() = %v, want %v", got, tt.want)
 			}
 		})