Merge branch 'main' into feature-metrics
diff --git a/common/constant/key.go b/common/constant/key.go
index 514f81c..b786b99 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -177,6 +177,7 @@
const (
ApplicationKey = "application"
ApplicationNameKey = "application_name"
+ ApplicationVersionKey = "application_version"
HostnameKey = "hostname"
IpKey = "ip"
OrganizationKey = "organization"
@@ -197,6 +198,10 @@
ProvidersCategory = "providers"
RouterKey = "router"
ExportKey = "export"
+ GitCommitIdKey = "git_commit_id"
+ ConfigCenterKey = "config_center"
+ ChangeTypeKey = "change_type"
+ KeyKey = "key"
)
// config center keys
@@ -404,3 +409,10 @@
LoggerFileLocalTimeKey = "logger.file.local-time"
LoggerFileCompressKey = "logger.file.compress"
)
+
+// metrics key
+const (
+ MetricsRegistry = "dubbo.metrics.registry"
+ MetricsMetadata = "dubbo.metrics.metadata"
+ MetricApp = "dubbo.metrics.app"
+)
diff --git a/common/host_util.go b/common/host_util.go
index ba36c0c..5c411ab 100644
--- a/common/host_util.go
+++ b/common/host_util.go
@@ -23,6 +23,7 @@
)
import (
+ "github.com/dubbogo/gost/log/logger"
gxnet "github.com/dubbogo/gost/net"
)
@@ -31,6 +32,7 @@
)
var localIp string
+var localHostname string
func GetLocalIp() string {
if len(localIp) != 0 {
@@ -40,6 +42,18 @@
return localIp
}
+func GetLocalHostName() string {
+ if len(localHostname) != 0 {
+ return localHostname
+ }
+ hostname, err := os.Hostname()
+ if err != nil {
+ logger.Errorf("can not get local hostname")
+ }
+ localHostname = hostname
+ return localHostname
+}
+
func HandleRegisterIPAndPort(url *URL) {
// if developer define registry port and ip, use it first.
if ipToRegistry := os.Getenv(constant.DubboIpToRegistryKey); len(ipToRegistry) > 0 {
diff --git a/config/config_center_config.go b/config/config_center_config.go
index 5ed3f7d..e206b8a 100644
--- a/config/config_center_config.go
+++ b/config/config_center_config.go
@@ -38,6 +38,9 @@
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+ metricsConfigCenter "dubbo.apache.org/dubbo-go/v3/metrics/config_center"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
)
// CenterConfig is configuration for config center
@@ -146,6 +149,7 @@
logger.Warnf("[Config Center] Dynamic config center has started, but config may not be initialized, because: %s", err)
return nil
}
+ defer metrics.Publish(metricsConfigCenter.NewIncMetricEvent(cc.DataId, cc.Group, remoting.EventTypeAdd, cc.Protocol))
if len(strConf) == 0 {
logger.Warnf("[Config Center] Dynamic config center has started, but got empty config with config-center configuration %+v\n"+
"Please check if your config-center config is correct.", cc)
diff --git a/config/instance/metadata_report.go b/config/instance/metadata_report.go
index 1cb5f63..16ba071 100644
--- a/config/instance/metadata_report.go
+++ b/config/instance/metadata_report.go
@@ -34,11 +34,10 @@
)
func GetMetadataReportInstance() report.MetadataReport {
- if instance != nil {
- return instance
+ if instance == nil {
+ instance = report.NewPubMetricEventReport(GetMetadataReportByRegistryProtocol(""))
}
-
- return GetMetadataReportByRegistryProtocol("")
+ return instance
}
// SetMetadataReportInstance, init metadat report instance
@@ -49,7 +48,7 @@
url = selectiveUrl[0]
fac := extension.GetMetadataReportFactory(url.Protocol)
if fac != nil {
- instance = fac.CreateMetadataReport(url)
+ instance = report.NewPubMetricEventReport(fac.CreateMetadataReport(url))
}
reportUrl = url
}
diff --git a/config/metric_config.go b/config/metric_config.go
index 3cc65c4..a6b8749 100644
--- a/config/metric_config.go
+++ b/config/metric_config.go
@@ -39,6 +39,7 @@
Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"`
PushGatewayAddress string `default:"" yaml:"push-gateway-address" json:"push-gateway-address,omitempty" property:"push-gateway-address"`
SummaryMaxAge int64 `default:"600000000000" yaml:"summary-max-age" json:"summary-max-age,omitempty" property:"summary-max-age"`
+ Protocol string `default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
}
func (mc *MetricConfig) ToReporterConfig() *metrics.ReporterConfig {
@@ -55,6 +56,7 @@
defaultMetricsReportConfig.Path = mc.Path
defaultMetricsReportConfig.PushGatewayAddress = mc.PushGatewayAddress
defaultMetricsReportConfig.SummaryMaxAge = mc.SummaryMaxAge
+ defaultMetricsReportConfig.Protocol = mc.Protocol
return defaultMetricsReportConfig
}
@@ -68,7 +70,10 @@
if err := verify(mc); err != nil {
return err
}
- extension.GetMetricReporter("prometheus", mc.ToReporterConfig())
+ metrics.InitAppInfo(GetRootConfig().Application.Name, GetRootConfig().Application.Version)
+ config := mc.ToReporterConfig()
+ extension.GetMetricReporter(mc.Protocol, config)
+ metrics.Init(config)
return nil
}
@@ -91,7 +96,7 @@
mc.Enable = newMetricConfig.Enable
logger.Infof("MetricConfig's Enable was dynamically updated, new value:%v", mc.Enable)
- extension.GetMetricReporter("prometheus", mc.ToReporterConfig())
+ extension.GetMetricReporter(mc.Protocol, mc.ToReporterConfig())
}
}
}
diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go
index defcafe..58e9f08 100644
--- a/config_center/nacos/listener.go
+++ b/config_center/nacos/listener.go
@@ -31,11 +31,14 @@
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+ metricsConfigCenter "dubbo.apache.org/dubbo-go/v3/metrics/config_center"
"dubbo.apache.org/dubbo-go/v3/remoting"
)
-func callback(listener config_center.ConfigurationListener, _, _, dataId, data string) {
+func callback(listener config_center.ConfigurationListener, _, group, dataId, data string) {
listener.Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate})
+ metrics.Publish(metricsConfigCenter.NewIncMetricEvent(dataId, group, remoting.EventTypeUpdate, metricsConfigCenter.Nacos))
}
func (n *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) {
diff --git a/config_center/zookeeper/listener.go b/config_center/zookeeper/listener.go
index 3d31179..12454d1 100644
--- a/config_center/zookeeper/listener.go
+++ b/config_center/zookeeper/listener.go
@@ -25,6 +25,8 @@
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+ metricsConfigCenter "dubbo.apache.org/dubbo-go/v3/metrics/config_center"
"dubbo.apache.org/dubbo-go/v3/remoting"
"dubbo.apache.org/dubbo-go/v3/remoting/zookeeper"
)
@@ -73,10 +75,12 @@
changeType = remoting.EventTypeDel
}
+ key, group := l.pathToKeyGroup(event.Path)
+ defer metrics.Publish(metricsConfigCenter.NewIncMetricEvent(key, group, changeType, metricsConfigCenter.Zookeeper))
if listeners, ok := l.keyListeners.Load(event.Path); ok {
for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) {
listener.Process(&config_center.ConfigChangeEvent{
- Key: l.pathToKey(event.Path),
+ Key: key,
Value: event.Content,
ConfigType: changeType,
})
@@ -86,10 +90,11 @@
return false
}
-func (l *CacheListener) pathToKey(path string) string {
+func (l *CacheListener) pathToKeyGroup(path string) (string, string) {
if len(path) == 0 {
- return path
+ return path, ""
}
groupKey := strings.Replace(strings.Replace(path, l.rootPath+constant.PathSeparator, "", -1), constant.PathSeparator, constant.DotSeparator, -1)
- return groupKey[strings.Index(groupKey, constant.DotSeparator)+1:]
+ index := strings.Index(groupKey, constant.DotSeparator)
+ return groupKey[index+1:], groupKey[0:index]
}
diff --git a/go.mod b/go.mod
index 542118f..d661125 100644
--- a/go.mod
+++ b/go.mod
@@ -47,6 +47,7 @@
github.com/pkg/errors v0.9.1
github.com/polarismesh/polaris-go v1.3.0
github.com/prometheus/client_golang v1.13.0
+ github.com/prometheus/common v0.37.0
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/sirupsen/logrus v1.7.0
github.com/stretchr/testify v1.8.2
diff --git a/imports/imports.go b/imports/imports.go
index c586896..71dd06b 100644
--- a/imports/imports.go
+++ b/imports/imports.go
@@ -64,6 +64,7 @@
_ "dubbo.apache.org/dubbo-go/v3/metadata/service/exporter/configurable"
_ "dubbo.apache.org/dubbo-go/v3/metadata/service/local"
_ "dubbo.apache.org/dubbo-go/v3/metadata/service/remote"
+ _ "dubbo.apache.org/dubbo-go/v3/metrics/app_info"
_ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus"
_ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo"
_ "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3"
diff --git a/metadata/report/reporter_metric.go b/metadata/report/reporter_metric.go
new file mode 100644
index 0000000..5ca0995
--- /dev/null
+++ b/metadata/report/reporter_metric.go
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package report
+
+import (
+ "time"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/metadata/identifier"
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+ "dubbo.apache.org/dubbo-go/v3/metrics/metadata"
+)
+
+type PubMetricEventReport struct {
+ MetadataReport
+}
+
+func NewPubMetricEventReport(r MetadataReport) MetadataReport {
+ return &PubMetricEventReport{MetadataReport: r}
+}
+
+func (r *PubMetricEventReport) StoreProviderMetadata(i *identifier.MetadataIdentifier, s string) error {
+ event := metadata.NewMetadataMetricTimeEvent(metadata.StoreProvider)
+ err := r.MetadataReport.StoreProviderMetadata(i, s)
+ event.Succ = err == nil
+ event.End = time.Now()
+ event.Attachment[constant.InterfaceKey] = i.ServiceInterface
+ metrics.Publish(event)
+ return err
+}
+
+func (r *PubMetricEventReport) GetAppMetadata(i *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) {
+ event := metadata.NewMetadataMetricTimeEvent(metadata.MetadataSub)
+ info, err := r.MetadataReport.GetAppMetadata(i)
+ event.Succ = err == nil
+ event.End = time.Now()
+ metrics.Publish(event)
+ return info, err
+}
+
+func (r *PubMetricEventReport) PublishAppMetadata(i *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error {
+ event := metadata.NewMetadataMetricTimeEvent(metadata.MetadataPush)
+ err := r.MetadataReport.PublishAppMetadata(i, info)
+ event.Succ = err == nil
+ event.End = time.Now()
+ metrics.Publish(event)
+ return err
+}
diff --git a/metrics/api.go b/metrics/api.go
new file mode 100644
index 0000000..fdc69c1
--- /dev/null
+++ b/metrics/api.go
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metrics
+
+import (
+ "sync"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
+)
+
+var registries = make(map[string]func(*ReporterConfig) MetricRegistry)
+var collectors = make([]CollectorFunc, 0)
+var registry MetricRegistry
+
+// CollectorFunc used to extend more indicators
+type CollectorFunc func(MetricRegistry, *ReporterConfig)
+
+// Init Metrics module
+func Init(config *ReporterConfig) {
+ if config.Enable {
+ // defalut protocol is already set in metricConfig
+ regFunc, ok := registries[config.Protocol]
+ if ok {
+ registry = regFunc(config)
+ for _, co := range collectors {
+ co(registry, config)
+ }
+ registry.Export()
+ }
+ }
+}
+
+// SetRegistry extend more MetricRegistry, default PrometheusRegistry
+func SetRegistry(name string, v func(*ReporterConfig) MetricRegistry) {
+ registries[name] = v
+}
+
+// AddCollector add more indicators, like metadata、sla、configcenter etc
+func AddCollector(name string, fun func(MetricRegistry, *ReporterConfig)) {
+ collectors = append(collectors, fun)
+}
+
+// MetricRegistry data container,data compute、expose、agg
+type MetricRegistry interface {
+ Counter(*MetricId) CounterMetric // add or update a counter
+ Gauge(*MetricId) GaugeMetric // add or update a gauge
+ Histogram(*MetricId) HistogramMetric // add a metric num to a histogram
+ Summary(*MetricId) SummaryMetric // add a metric num to a summary
+ Export() // expose metric data, such as Prometheus http exporter
+ // GetMetrics() []*MetricSample // get all metric data
+ // GetMetricsString() (string, error) // get text format metric data
+}
+
+// multi registry,like micrometer CompositeMeterRegistry
+// type CompositeRegistry struct {
+// rs []MetricRegistry
+// }
+
+// Type metric type, save with micrometer
+type Type uint8
+
+const (
+ Counter Type = iota
+ Gauge
+ LongTaskTimer
+ Timer
+ DistributionSummary
+ Other
+)
+
+// MetricId
+// # HELP dubbo_metadata_store_provider_succeed_total Succeed Store Provider Metadata
+// # TYPE dubbo_metadata_store_provider_succeed_total gauge
+// dubbo_metadata_store_provider_succeed_total{application_name="provider",hostname="localhost",interface="org.example.DemoService",ip="10.252.156.213",} 1.0
+// other properties except value
+type MetricId struct {
+ Name string
+ Desc string
+ Tags map[string]string
+ Type Type
+}
+
+func (m *MetricId) TagKeys() []string {
+ keys := make([]string, 0, len(m.Tags))
+ for k := range m.Tags {
+ keys = append(keys, k)
+ }
+ return keys
+}
+
+func NewMetricId(key *MetricKey, level MetricLevel) *MetricId {
+ return &MetricId{Name: key.Name, Desc: key.Desc, Tags: level.Tags()}
+}
+
+// MetricSample a metric sample,This is the final data presentation,
+// not an intermediate result(like summary,histogram they will export to a set of MetricSample)
+type MetricSample struct {
+ *MetricId
+ value float64
+}
+
+// CounterMetric counter metric
+type CounterMetric interface {
+ Inc()
+ Add(float64)
+}
+
+// GaugeMetric gauge metric
+type GaugeMetric interface {
+ Set(float64)
+ // Inc()
+ // Dec()
+ // Add(float64)
+ // Sub(float64)
+}
+
+// HistogramMetric histogram metric
+type HistogramMetric interface {
+ Record(float64)
+}
+
+// SummaryMetric summary metric
+type SummaryMetric interface {
+ Record(float64)
+}
+
+// StatesMetrics multi metrics,include total,success num, fail num,call MetricsRegistry save data
+type StatesMetrics interface {
+ Success()
+ AddSuccess(float64)
+ Fail()
+ AddFailed(float64)
+ Inc(succ bool)
+}
+
+func NewStatesMetrics(total *MetricId, succ *MetricId, fail *MetricId, reg MetricRegistry) StatesMetrics {
+ return &DefaultStatesMetric{total: total, succ: succ, fail: fail, r: reg}
+}
+
+type DefaultStatesMetric struct {
+ r MetricRegistry
+ total, succ, fail *MetricId
+}
+
+func (c DefaultStatesMetric) Inc(succ bool) {
+ if succ {
+ c.Success()
+ } else {
+ c.Fail()
+ }
+}
+func (c DefaultStatesMetric) Success() {
+ c.r.Counter(c.total).Inc()
+ c.r.Counter(c.succ).Inc()
+}
+
+func (c DefaultStatesMetric) AddSuccess(v float64) {
+ c.r.Counter(c.total).Add(v)
+ c.r.Counter(c.succ).Add(v)
+}
+
+func (c DefaultStatesMetric) Fail() {
+ c.r.Counter(c.total).Inc()
+ c.r.Counter(c.fail).Inc()
+}
+
+func (c DefaultStatesMetric) AddFailed(v float64) {
+ c.r.Counter(c.total).Add(v)
+ c.r.Counter(c.fail).Add(v)
+}
+
+// TimeMetric muliti metrics, include min(Gauge)、max(Gauge)、avg(Gauge)、sum(Gauge)、last(Gauge),call MetricRegistry to expose
+// see dubbo-java org.apache.dubbo.metrics.aggregate.TimeWindowAggregator
+type TimeMetric interface {
+ Record(float64)
+}
+
+const (
+ defaultBucketNum = 10
+ defalutTimeWindowSeconds = 120
+)
+
+// NewTimeMetric init and write all data to registry
+func NewTimeMetric(min, max, avg, sum, last *MetricId, mr MetricRegistry) TimeMetric {
+ return &DefaultTimeMetric{r: mr, min: min, max: max, avg: avg, sum: sum, last: last,
+ agg: aggregate.NewTimeWindowAggregator(defaultBucketNum, defalutTimeWindowSeconds)}
+}
+
+type DefaultTimeMetric struct {
+ r MetricRegistry
+ agg *aggregate.TimeWindowAggregator
+ min, max, avg, sum, last *MetricId
+}
+
+func (m *DefaultTimeMetric) Record(v float64) {
+ m.agg.Add(v)
+ result := m.agg.Result()
+ m.r.Gauge(m.max).Set(result.Max)
+ m.r.Gauge(m.min).Set(result.Min)
+ m.r.Gauge(m.avg).Set(result.Avg)
+ m.r.Gauge(m.sum).Set(result.Total)
+ m.r.Gauge(m.last).Set(v)
+}
+
+// cache if needed, TimeMetrics must cached
+var metricsCache map[string]interface{} = make(map[string]interface{})
+var metricsCacheMutex sync.RWMutex
+
+func ComputeIfAbsentCache(key string, supplier func() interface{}) interface{} {
+ metricsCacheMutex.RLock()
+ v, ok := metricsCache[key]
+ metricsCacheMutex.RUnlock()
+ if ok {
+ return v
+ } else {
+ metricsCacheMutex.Lock()
+ defer metricsCacheMutex.Unlock()
+ n := supplier()
+ metricsCache[key] = n
+ return n
+ }
+}
diff --git a/metrics/app_info/collector.go b/metrics/app_info/collector.go
new file mode 100644
index 0000000..a2bae02
--- /dev/null
+++ b/metrics/app_info/collector.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package app_info
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+)
+
+/*
+ * # HELP dubbo_application_info_total Total Application Info
+ * # TYPE dubbo_application_info_total counter
+ * dubbo_application_info_total{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="localhost",ip="127.0.0.1",} 1.0
+ */
+var info = metrics.NewMetricKey("dubbo_application_info_total", "Total Application Info") // Total Application Info include application name、version etc
+
+func init() {
+ metrics.AddCollector("application_info", func(mr metrics.MetricRegistry, config *metrics.ReporterConfig) {
+ mr.Counter(&metrics.MetricId{Name: info.Name, Desc: info.Desc, Tags: metrics.GetApplicationLevel().Tags()}).Inc()
+ })
+}
diff --git a/metrics/bus.go b/metrics/bus.go
new file mode 100644
index 0000000..333d636
--- /dev/null
+++ b/metrics/bus.go
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metrics
+
+import (
+ "sync"
+)
+
+// eventListener is a struct that encapsulates the listener map and provides thread-safe access to it.
+type eventListener struct {
+ mu sync.RWMutex
+ listener map[string]chan MetricsEvent
+}
+
+var listener = &eventListener{
+ listener: make(map[string]chan MetricsEvent),
+}
+
+// Publish publishes an event to all subscribers of the same type.
+func Publish(event MetricsEvent) {
+ listener.mu.RLock()
+ defer listener.mu.RUnlock()
+
+ if ch, ok := listener.listener[event.Type()]; ok {
+ select {
+ case ch <- event:
+ default:
+ // If the channel is full, drop the event to avoid blocking.
+ }
+ }
+}
+
+// Subscribe subscribes to events of the given type.
+func Subscribe(typ string, ch chan MetricsEvent) {
+ listener.mu.Lock()
+ defer listener.mu.Unlock()
+
+ listener.listener[typ] = ch
+}
+
+// Unsubscribe unsubscribes from events of the given type.
+func Unsubscribe(typ string) {
+ listener.mu.Lock()
+ defer listener.mu.Unlock()
+
+ if ch, ok := listener.listener[typ]; ok {
+ close(ch)
+ delete(listener.listener, typ)
+ }
+}
diff --git a/metrics/bus_test.go b/metrics/bus_test.go
new file mode 100644
index 0000000..185f508
--- /dev/null
+++ b/metrics/bus_test.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metrics
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+var mockChan = make(chan MetricsEvent, 16)
+
+type MockEvent struct {
+}
+
+func (m MockEvent) Type() string {
+ return "dubbo.metrics.mock"
+}
+
+func NewEmptyMockEvent() *MockEvent {
+ return &MockEvent{}
+}
+
+func init() {
+ Subscribe("dubbo.metrics.mock", mockChan)
+ Publish(NewEmptyMockEvent())
+}
+
+func TestBusPublish(t *testing.T) {
+ t.Run("testBusPublish", func(t *testing.T) {
+ event := <-mockChan
+
+ if event, ok := event.(MockEvent); ok {
+ assert.Equal(t, event, NewEmptyMockEvent())
+ }
+ })
+}
diff --git a/metrics/common.go b/metrics/common.go
new file mode 100644
index 0000000..f0ce9cf
--- /dev/null
+++ b/metrics/common.go
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metrics
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+)
+
+type MetricKey struct {
+ Name string
+ Desc string
+}
+
+func NewMetricKey(name string, desc string) *MetricKey {
+ return &MetricKey{Name: name, Desc: desc}
+}
+
+type MetricLevel interface {
+ Tags() map[string]string
+}
+
+type ApplicationMetricLevel struct {
+ ApplicationName string
+ Version string
+ GitCommitId string
+ Ip string
+ HostName string
+}
+
+var applicationName string
+var applicationVersion string
+
+// cannot import rootConfig,may cause cycle import,so be it
+func InitAppInfo(appName string, appVersion string) {
+ applicationName = appName
+ applicationVersion = appVersion
+}
+
+func GetApplicationLevel() *ApplicationMetricLevel {
+ return &ApplicationMetricLevel{
+ ApplicationName: applicationName,
+ Version: applicationVersion,
+ Ip: common.GetLocalIp(),
+ HostName: common.GetLocalHostName(),
+ GitCommitId: "",
+ }
+}
+
+func (m *ApplicationMetricLevel) Tags() map[string]string {
+ tags := make(map[string]string)
+ tags[constant.IpKey] = m.Ip
+ tags[constant.HostnameKey] = m.HostName
+ tags[constant.ApplicationKey] = m.ApplicationName
+ tags[constant.ApplicationVersionKey] = m.Version
+ tags[constant.GitCommitIdKey] = m.GitCommitId
+ return tags
+}
+
+type ServiceMetricLevel struct {
+ *ApplicationMetricLevel
+ Interface string
+}
+
+func NewServiceMetric(interfaceName string) *ServiceMetricLevel {
+ return &ServiceMetricLevel{ApplicationMetricLevel: GetApplicationLevel(), Interface: interfaceName}
+}
+
+func (m ServiceMetricLevel) Tags() map[string]string {
+ tags := m.ApplicationMetricLevel.Tags()
+ tags[constant.InterfaceKey] = m.Interface
+ return tags
+}
+
+type MethodMetricLevel struct {
+ *ServiceMetricLevel
+ Method string
+ Group string
+ Version string
+}
+
+func (m MethodMetricLevel) Tags() map[string]string {
+ tags := m.ServiceMetricLevel.Tags()
+ tags[constant.MethodKey] = m.Method
+ tags[constant.GroupKey] = m.Group
+ tags[constant.VersionKey] = m.Version
+ return tags
+}
+
+type ConfigCenterLevel struct {
+ ApplicationName string
+ Ip string
+ HostName string
+ Key string
+ Group string
+ ConfigCenter string
+ ChangeType string
+}
+
+func NewConfigCenterLevel(key string, group string, configCenter string, changeType string) *ConfigCenterLevel {
+ return &ConfigCenterLevel{
+ ApplicationName: applicationName,
+ Ip: common.GetLocalIp(),
+ HostName: common.GetLocalHostName(),
+ Key: key,
+ Group: group,
+ ConfigCenter: configCenter,
+ ChangeType: changeType,
+ }
+}
+
+func (l ConfigCenterLevel) Tags() map[string]string {
+ tags := make(map[string]string)
+ tags[constant.ApplicationKey] = l.ApplicationName
+ tags[constant.IpKey] = l.Ip
+ tags[constant.HostnameKey] = l.HostName
+ tags[constant.KeyKey] = l.Key
+ tags[constant.GroupKey] = l.Group
+ tags[constant.ConfigCenterKey] = l.ConfigCenter
+ tags[constant.ChangeTypeKey] = l.ChangeType
+ return tags
+}
diff --git a/metrics/config_center/collector.go b/metrics/config_center/collector.go
new file mode 100644
index 0000000..7236501
--- /dev/null
+++ b/metrics/config_center/collector.go
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metrics
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+const eventType = constant.MetricApp
+
+var ch = make(chan metrics.MetricsEvent, 10)
+var info = metrics.NewMetricKey("dubbo_configcenter_total", "Config Changed Total")
+
+func init() {
+ metrics.AddCollector("application_info", func(mr metrics.MetricRegistry, config *metrics.ReporterConfig) {
+ c := &configCenterCollector{r: mr}
+ c.start()
+ })
+}
+
+type configCenterCollector struct {
+ r metrics.MetricRegistry
+}
+
+func (c *configCenterCollector) start() {
+ metrics.Subscribe(eventType, ch)
+ go func() {
+ for e := range ch {
+ if event, ok := e.(*ConfigCenterMetricEvent); ok {
+ c.handleDataChange(event)
+ }
+ }
+ }()
+}
+
+func (c *configCenterCollector) handleDataChange(event *ConfigCenterMetricEvent) {
+ id := metrics.NewMetricId(info, metrics.NewConfigCenterLevel(event.key, event.group, event.configCenter, event.getChangeType()))
+ c.r.Counter(id).Add(event.size)
+}
+
+const (
+ Nacos = "nacos"
+ Apollo = "apollo"
+ Zookeeper = "zookeeper"
+)
+
+type ConfigCenterMetricEvent struct {
+ // Name MetricName
+ key string
+ group string
+ configCenter string
+ changeType remoting.EventType
+ size float64
+}
+
+func (e *ConfigCenterMetricEvent) getChangeType() string {
+ switch e.changeType {
+ case remoting.EventTypeAdd:
+ return "added"
+ case remoting.EventTypeDel:
+ return "deleted"
+ case remoting.EventTypeUpdate:
+ return "modified"
+ default:
+ return ""
+ }
+}
+
+func (*ConfigCenterMetricEvent) Type() string {
+ return eventType
+}
+
+func NewIncMetricEvent(key, group string, changeType remoting.EventType, c string) *ConfigCenterMetricEvent {
+ return &ConfigCenterMetricEvent{key: key, group: group, changeType: changeType, configCenter: c, size: 1}
+}
diff --git a/metrics/event.go b/metrics/event.go
new file mode 100644
index 0000000..f78589a
--- /dev/null
+++ b/metrics/event.go
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metrics
+
+// MetricsEvent represents an event that can be published and subscribed to.
+type MetricsEvent interface {
+ Type() string
+}
diff --git a/metrics/metadata/collector.go b/metrics/metadata/collector.go
new file mode 100644
index 0000000..16740d3
--- /dev/null
+++ b/metrics/metadata/collector.go
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metadata
+
+import (
+ "time"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+)
+
+const eventType = constant.MetricsMetadata
+
+var ch = make(chan metrics.MetricsEvent, 10)
+
+func init() {
+ metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, rc *metrics.ReporterConfig) {
+ l := &MetadataMetricCollector{r: mr}
+ l.start()
+ })
+}
+
+type MetadataMetricCollector struct {
+ r metrics.MetricRegistry
+}
+
+func (c *MetadataMetricCollector) start() {
+ metrics.Subscribe(eventType, ch)
+ go func() {
+ for e := range ch {
+ if event, ok := e.(*MetadataMetricEvent); ok {
+ switch event.Name {
+ case StoreProvider:
+ c.handleStoreProvider(event)
+ case MetadataPush:
+ c.handleMetadataPush(event)
+ case MetadataSub:
+ c.handleMetadataSub(event)
+ case SubscribeServiceRt:
+ c.handleSubscribeService(event)
+ default:
+ }
+ }
+ }
+ }()
+}
+
+func (c *MetadataMetricCollector) handleMetadataPush(event *MetadataMetricEvent) {
+ m := metrics.ComputeIfAbsentCache(dubboMetadataPush, func() interface{} {
+ return newStatesMetricFunc(metadataPushNum, metadataPushNumSucceed, metadataPushNumFailed, metrics.GetApplicationLevel(), c.r)
+ }).(metrics.StatesMetrics)
+ m.Inc(event.Succ)
+ metric := metrics.ComputeIfAbsentCache(dubboPushRt, func() interface{} {
+ return newTimeMetrics(pushRtMin, pushRtMax, pushRtAvg, pushRtSum, pushRtLast, metrics.GetApplicationLevel(), c.r)
+ }).(metrics.TimeMetric)
+ metric.Record(event.CostMs())
+}
+
+func (c *MetadataMetricCollector) handleMetadataSub(event *MetadataMetricEvent) {
+ m := metrics.ComputeIfAbsentCache(dubboMetadataSubscribe, func() interface{} {
+ return newStatesMetricFunc(metadataSubNum, metadataSubNumSucceed, metadataSubNumFailed, metrics.GetApplicationLevel(), c.r)
+ }).(metrics.StatesMetrics)
+ m.Inc(event.Succ)
+ metric := metrics.ComputeIfAbsentCache(dubboSubscribeRt, func() interface{} {
+ return newTimeMetrics(subscribeRtMin, subscribeRtMax, subscribeRtAvg, subscribeRtSum, subscribeRtLast, metrics.GetApplicationLevel(), c.r)
+ }).(metrics.TimeMetric)
+ metric.Record(event.CostMs())
+}
+
+func (c *MetadataMetricCollector) handleStoreProvider(event *MetadataMetricEvent) {
+ interfaceName := event.Attachment[constant.InterfaceKey]
+ m := metrics.ComputeIfAbsentCache(dubboMetadataStoreProvider+":"+interfaceName, func() interface{} {
+ return newStatesMetricFunc(metadataStoreProvider, metadataStoreProviderSucceed, metadataStoreProviderFailed,
+ metrics.NewServiceMetric(interfaceName), c.r)
+ }).(metrics.StatesMetrics)
+ m.Inc(event.Succ)
+ metric := metrics.ComputeIfAbsentCache(dubboStoreProviderInterfaceRt+":"+interfaceName, func() interface{} {
+ return newTimeMetrics(storeProviderInterfaceRtMin, storeProviderInterfaceRtMax, storeProviderInterfaceRtAvg,
+ storeProviderInterfaceRtSum, storeProviderInterfaceRtLast, metrics.NewServiceMetric(interfaceName), c.r)
+ }).(metrics.TimeMetric)
+ metric.Record(event.CostMs())
+}
+
+func (c *MetadataMetricCollector) handleSubscribeService(event *MetadataMetricEvent) {
+ interfaceName := event.Attachment[constant.InterfaceKey]
+ metric := metrics.ComputeIfAbsentCache(dubboSubscribeServiceRt+":"+interfaceName, func() interface{} {
+ return newTimeMetrics(subscribeServiceRtMin, subscribeServiceRtMax, subscribeServiceRtAvg, subscribeServiceRtSum,
+ subscribeServiceRtLast, metrics.NewServiceMetric(interfaceName), c.r)
+ }).(metrics.TimeMetric)
+ metric.Record(event.CostMs())
+}
+
+func newStatesMetricFunc(total *metrics.MetricKey, succ *metrics.MetricKey, fail *metrics.MetricKey,
+ level metrics.MetricLevel, reg metrics.MetricRegistry) metrics.StatesMetrics {
+ return metrics.NewStatesMetrics(metrics.NewMetricId(total, level), metrics.NewMetricId(succ, level),
+ metrics.NewMetricId(fail, level), reg)
+}
+
+func newTimeMetrics(min, max, avg, sum, last *metrics.MetricKey, level metrics.MetricLevel, mr metrics.MetricRegistry) metrics.TimeMetric {
+ return metrics.NewTimeMetric(metrics.NewMetricId(min, level), metrics.NewMetricId(max, level), metrics.NewMetricId(avg, level),
+ metrics.NewMetricId(sum, level), metrics.NewMetricId(last, level), mr)
+}
+
+type MetadataMetricEvent struct {
+ Name MetricName
+ Succ bool
+ Start time.Time
+ End time.Time
+ Attachment map[string]string
+}
+
+func (*MetadataMetricEvent) Type() string {
+ return eventType
+}
+
+func (e *MetadataMetricEvent) CostMs() float64 {
+ return float64(e.End.Sub(e.Start)) / float64(time.Millisecond)
+}
+
+func NewMetadataMetricTimeEvent(n MetricName) *MetadataMetricEvent {
+ return &MetadataMetricEvent{Name: n, Start: time.Now(), Attachment: make(map[string]string)}
+}
diff --git a/metrics/metadata/metric_set.go b/metrics/metadata/metric_set.go
new file mode 100644
index 0000000..e7ade6e
--- /dev/null
+++ b/metrics/metadata/metric_set.go
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metadata
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+)
+
+type MetricName int8
+
+const (
+ MetadataPush MetricName = iota
+ MetadataSub
+ StoreProvider
+ // PushRt
+ // SubscribeRt
+ // StoreProviderInterfaceRt
+ SubscribeServiceRt
+)
+
+const (
+ dubboMetadataPush = "dubbo_metadata_push_num"
+ dubboPushRt = "dubbo_push_rt_milliseconds"
+ dubboMetadataSubscribe = "dubbo_metadata_subscribe_num"
+ dubboSubscribeRt = "dubbo_subscribe_rt_milliseconds"
+ dubboMetadataStoreProvider = "dubbo_metadata_store_provider"
+ dubboStoreProviderInterfaceRt = "dubbo_store_provider_interface_rt_milliseconds"
+ dubboSubscribeServiceRt = "dubbo_subscribe_service_rt_milliseconds"
+)
+
+const (
+ totalSuffix = "_total"
+ succSuffix = "_succeed_total"
+ failedSuffix = "_failed_total"
+ sumSuffix = "_sum"
+ lastSuffix = "_last"
+ minSuffix = "_min"
+ maxSuffix = "_max"
+ avgSuffix = "_avg"
+)
+
+var (
+ // app level
+ metadataPushNum = metrics.NewMetricKey(dubboMetadataPush+totalSuffix, "Total Num")
+ metadataPushNumSucceed = metrics.NewMetricKey(dubboMetadataPush+succSuffix, "Succeed Push Num")
+ metadataPushNumFailed = metrics.NewMetricKey(dubboMetadataPush+failedSuffix, "Failed Push Num")
+ // app level
+ metadataSubNum = metrics.NewMetricKey(dubboMetadataSubscribe+totalSuffix, "Total Metadata Subscribe Num")
+ metadataSubNumSucceed = metrics.NewMetricKey(dubboMetadataSubscribe+succSuffix, "Succeed Metadata Subscribe Num")
+ metadataSubNumFailed = metrics.NewMetricKey(dubboMetadataSubscribe+failedSuffix, "Failed Metadata Subscribe Num")
+ // app level
+ pushRtSum = metrics.NewMetricKey(dubboPushRt+sumSuffix, "Sum Response Time")
+ pushRtLast = metrics.NewMetricKey(dubboPushRt+lastSuffix, "Last Response Time")
+ pushRtMin = metrics.NewMetricKey(dubboPushRt+minSuffix, "Min Response Time")
+ pushRtMax = metrics.NewMetricKey(dubboPushRt+maxSuffix, "Max Response Time")
+ pushRtAvg = metrics.NewMetricKey(dubboPushRt+avgSuffix, "Average Response Time")
+ // app level
+ subscribeRtSum = metrics.NewMetricKey(dubboSubscribeRt+sumSuffix, "Sum Response Time")
+ subscribeRtLast = metrics.NewMetricKey(dubboSubscribeRt+lastSuffix, "Last Response Time")
+ subscribeRtMin = metrics.NewMetricKey(dubboSubscribeRt+minSuffix, "Min Response Time")
+ subscribeRtMax = metrics.NewMetricKey(dubboSubscribeRt+maxSuffix, "Max Response Time")
+ subscribeRtAvg = metrics.NewMetricKey(dubboSubscribeRt+avgSuffix, "Average Response Time")
+
+ /*
+ # HELP dubbo_metadata_store_provider_succeed_total Succeed Store Provider Metadata
+ # TYPE dubbo_metadata_store_provider_succeed_total gauge
+ dubbo_metadata_store_provider_succeed_total{application_name="metrics-provider",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService2",ip="10.252.156.213",} 1.0
+ dubbo_metadata_store_provider_succeed_total{application_name="metrics-provider",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="10.252.156.213",} 1.0
+ */
+ // service level
+ metadataStoreProviderFailed = metrics.NewMetricKey(dubboMetadataStoreProvider+failedSuffix, "Total Failed Provider Metadata Store")
+ metadataStoreProviderSucceed = metrics.NewMetricKey(dubboMetadataStoreProvider+succSuffix, "Total Succeed Provider Metadata Store")
+ metadataStoreProvider = metrics.NewMetricKey(dubboMetadataStoreProvider+totalSuffix, "Total Provider Metadata Store")
+
+ /*
+ # HELP dubbo_store_provider_interface_rt_milliseconds_avg Average Response Time
+ # TYPE dubbo_store_provider_interface_rt_milliseconds_avg gauge
+ dubbo_store_provider_interface_rt_milliseconds_avg{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="10.252.156.213",} 504.0
+ dubbo_store_provider_interface_rt_milliseconds_avg{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService2",ip="10.252.156.213",} 10837.0
+ */
+ // service level
+ storeProviderInterfaceRtAvg = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+avgSuffix, "Average Store Provider Interface Time")
+ storeProviderInterfaceRtLast = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+lastSuffix, "Last Store Provider Interface Time")
+ storeProviderInterfaceRtMax = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+maxSuffix, "Max Store Provider Interface Time")
+ storeProviderInterfaceRtMin = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+minSuffix, "Min Store Provider Interface Time")
+ storeProviderInterfaceRtSum = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+sumSuffix, "Sum Store Provider Interface Time")
+
+ subscribeServiceRtLast = metrics.NewMetricKey(dubboSubscribeServiceRt+lastSuffix, "Last Subscribe Service Time")
+ subscribeServiceRtMax = metrics.NewMetricKey(dubboSubscribeServiceRt+maxSuffix, "Max Subscribe Service Time")
+ subscribeServiceRtMin = metrics.NewMetricKey(dubboSubscribeServiceRt+minSuffix, "Min Subscribe Service Time")
+ subscribeServiceRtSum = metrics.NewMetricKey(dubboSubscribeServiceRt+sumSuffix, "Sum Subscribe Service Time")
+ subscribeServiceRtAvg = metrics.NewMetricKey(dubboSubscribeServiceRt+avgSuffix, "Average Subscribe Service Time")
+)
diff --git a/metrics/prometheus/registry.go b/metrics/prometheus/registry.go
new file mode 100644
index 0000000..70946d6
--- /dev/null
+++ b/metrics/prometheus/registry.go
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package prometheus
+
+import (
+ "bytes"
+ "sync"
+)
+
+import (
+ prom "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
+ "github.com/prometheus/common/expfmt"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+)
+
+func init() {
+ metrics.SetRegistry("prometheus", func(rc *metrics.ReporterConfig) metrics.MetricRegistry {
+ return &promMetricRegistry{
+ cvm: make(map[string]*prom.CounterVec),
+ gvm: make(map[string]*prom.GaugeVec),
+ hvm: make(map[string]*prom.HistogramVec),
+ svm: make(map[string]*prom.SummaryVec),
+ }
+ })
+}
+
+type promMetricRegistry struct {
+ mtx sync.RWMutex // Protects metrics.
+ cvm map[string]*prom.CounterVec // prom.CounterVec
+ gvm map[string]*prom.GaugeVec // prom.GaugeVec
+ hvm map[string]*prom.HistogramVec // prom.HistogramVec
+ svm map[string]*prom.SummaryVec // prom.SummaryVec
+}
+
+func (p *promMetricRegistry) Counter(m *metrics.MetricId) metrics.CounterMetric {
+ p.mtx.RLock()
+ vec, ok := p.cvm[m.Name]
+ p.mtx.RUnlock()
+ if !ok {
+ p.mtx.Lock()
+ vec = promauto.NewCounterVec(prom.CounterOpts{
+ Name: m.Name,
+ Help: m.Desc,
+ }, m.TagKeys())
+ p.cvm[m.Name] = vec
+ p.mtx.Unlock()
+ }
+ c := vec.With(m.Tags)
+ return &counter{pc: c}
+}
+
+func (p *promMetricRegistry) Gauge(m *metrics.MetricId) metrics.GaugeMetric {
+ p.mtx.RLock()
+ vec, ok := p.gvm[m.Name]
+ p.mtx.RUnlock()
+ if !ok {
+ p.mtx.Lock()
+ vec = promauto.NewGaugeVec(prom.GaugeOpts{
+ Name: m.Name,
+ Help: m.Desc,
+ }, m.TagKeys())
+ p.gvm[m.Name] = vec
+ p.mtx.Unlock()
+ }
+ g := vec.With(m.Tags)
+ return &gauge{pg: g}
+}
+
+func (p *promMetricRegistry) Histogram(m *metrics.MetricId) metrics.HistogramMetric {
+ p.mtx.RLock()
+ vec, ok := p.hvm[m.Name]
+ p.mtx.RUnlock()
+ if !ok {
+ p.mtx.Lock()
+ vec = promauto.NewHistogramVec(prom.HistogramOpts{
+ Name: m.Name,
+ Help: m.Desc,
+ }, m.TagKeys())
+ p.hvm[m.Name] = vec
+ p.mtx.Unlock()
+ }
+ h := vec.With(m.Tags)
+ return &histogram{ph: h.(prom.Histogram)}
+}
+
+func (p *promMetricRegistry) Summary(m *metrics.MetricId) metrics.SummaryMetric {
+ p.mtx.RLock()
+ vec, ok := p.svm[m.Name]
+ p.mtx.RUnlock()
+ if !ok {
+ p.mtx.Lock()
+ vec = promauto.NewSummaryVec(prom.SummaryOpts{
+ Name: m.Name,
+ Help: m.Desc,
+ }, m.TagKeys())
+ p.svm[m.Name] = vec
+ p.mtx.Unlock()
+ }
+ s := vec.With(m.Tags)
+ return &summary{ps: s.(prom.Summary)}
+}
+
+func (p *promMetricRegistry) Export() {
+
+}
+
+func (p *promMetricRegistry) Scrape() (string, error) {
+ r := prom.DefaultRegisterer.(*prom.Registry)
+ gathering, err := r.Gather()
+ if err != nil {
+ return "", err
+ }
+ out := &bytes.Buffer{}
+ for _, mf := range gathering {
+ if _, err := expfmt.MetricFamilyToText(out, mf); err != nil {
+ return "", err
+ }
+ }
+ return out.String(), nil
+}
+
+type counter struct {
+ pc prom.Counter
+}
+
+func (c *counter) Inc() {
+ c.pc.Inc()
+}
+func (c *counter) Add(v float64) {
+ c.pc.Add(v)
+}
+
+type gauge struct {
+ pg prom.Gauge
+}
+
+// func (g *gauge) Inc() {
+// g.pg.Inc()
+// }
+//
+// func (g *gauge) Dec() {
+// g.pg.Dec()
+// }
+func (g *gauge) Set(v float64) {
+ g.pg.Set(v)
+}
+
+// func (g *gauge) Add(v float64) {
+// g.pg.Add(v)
+// }
+// func (g *gauge) Sub(v float64) {
+// g.pg.Sub(v)
+// }
+
+type histogram struct {
+ ph prom.Histogram
+}
+
+func (h *histogram) Record(v float64) {
+ h.ph.Observe(v)
+}
+
+type summary struct {
+ ps prom.Summary
+}
+
+func (s *summary) Record(v float64) {
+ s.ps.Observe(v)
+}
diff --git a/metrics/registry/collector.go b/metrics/registry/collector.go
new file mode 100644
index 0000000..7479fd2
--- /dev/null
+++ b/metrics/registry/collector.go
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package registry
+
+import (
+ "time"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+)
+
+var (
+ registryChan = make(chan metrics.MetricsEvent, 128)
+)
+
+func init() {
+ metrics.AddCollector("registry", func(m metrics.MetricRegistry, c *metrics.ReporterConfig) {
+ rc := ®istryCollector{regRegistry: m}
+ go rc.start()
+ })
+}
+
+// registryCollector is the registry's metrics collector
+type registryCollector struct {
+ regRegistry metrics.MetricRegistry
+}
+
+func (rc *registryCollector) start() {
+ metrics.Subscribe(constant.MetricsRegistry, registryChan)
+ for event := range registryChan {
+ if registryEvent, ok := event.(*RegistryMetricsEvent); ok {
+ switch registryEvent.Name {
+ case Reg:
+ rc.regHandler(registryEvent)
+ case Sub:
+ rc.subHandler(registryEvent)
+ case Notify:
+ rc.notifyHandler(registryEvent)
+ case ServerReg:
+ rc.serverRegHandler(registryEvent)
+ case ServerSub:
+ rc.serverSubHandler(registryEvent)
+ default:
+ }
+ }
+ }
+}
+
+func newStatesMetricFunc(total *metrics.MetricKey, succ *metrics.MetricKey, fail *metrics.MetricKey,
+ level metrics.MetricLevel, reg metrics.MetricRegistry) metrics.StatesMetrics {
+ return metrics.NewStatesMetrics(metrics.NewMetricId(total, level), metrics.NewMetricId(succ, level),
+ metrics.NewMetricId(fail, level), reg)
+}
+
+func newTimeMetrics(min, max, avg, sum, last *metrics.MetricKey, level metrics.MetricLevel, mr metrics.MetricRegistry) metrics.TimeMetric {
+ return metrics.NewTimeMetric(metrics.NewMetricId(min, level), metrics.NewMetricId(max, level), metrics.NewMetricId(avg, level),
+ metrics.NewMetricId(sum, level), metrics.NewMetricId(last, level), mr)
+}
+
+// regHandler handles register metrics
+func (rc *registryCollector) regHandler(event *RegistryMetricsEvent) {
+ // Event is converted to metrics
+ // Save metrics to the MetricRegistry
+ m := metrics.ComputeIfAbsentCache(dubboRegNum, func() interface{} {
+ return newStatesMetricFunc(RegisterMetricRequests, RegisterMetricRequestsSucceed, RegisterMetricRequestsFailed, metrics.GetApplicationLevel(), rc.regRegistry)
+ }).(metrics.StatesMetrics)
+ m.Inc(event.Succ)
+ metric := metrics.ComputeIfAbsentCache(dubboRegRt, func() interface{} {
+ return newTimeMetrics(RegisterRtMillisecondsMin, RegisterRtMillisecondsMax, RegisterRtMillisecondsAvg, RegisterRtMillisecondsSum, RegisterRtMillisecondsLast, metrics.GetApplicationLevel(), rc.regRegistry)
+ }).(metrics.TimeMetric)
+ metric.Record(event.CostMs())
+}
+
+// subHandler handles subscribe metrics
+func (rc *registryCollector) subHandler(event *RegistryMetricsEvent) {
+ // Event is converted to metrics
+ // Save metrics to the MetricRegistry
+ m := newStatesMetricFunc(SubscribeMetricNum, SubscribeMetricNumSucceed, SubscribeMetricNumFailed, metrics.GetApplicationLevel(), rc.regRegistry)
+ m.Inc(event.Succ)
+}
+
+// notifyHandler handles notify metrics
+func (rc *registryCollector) notifyHandler(event *RegistryMetricsEvent) {
+ // Event is converted to metrics
+ // Save metrics to the MetricRegistry
+ rc.regRegistry.Counter(metrics.NewMetricId(NotifyMetricRequests, metrics.GetApplicationLevel())).Inc()
+ rc.regRegistry.Histogram(metrics.NewMetricId(NotifyMetricNumLast, metrics.GetApplicationLevel())).Record(float64(event.End.UnixNano()) / float64(time.Second))
+ metric := metrics.ComputeIfAbsentCache(dubboNotifyRt, func() interface{} {
+ return newTimeMetrics(NotifyRtMillisecondsMin, NotifyRtMillisecondsMax, NotifyRtMillisecondsAvg, NotifyRtMillisecondsSum, NotifyRtMillisecondsLast, metrics.GetApplicationLevel(), rc.regRegistry)
+ }).(metrics.TimeMetric)
+ metric.Record(event.CostMs())
+}
+
+// directoryHandler handles directory metrics
+func (rc *registryCollector) directoryHandler(event *RegistryMetricsEvent) {
+ // Event is converted to metrics
+ // Save metrics to the MetricRegistry
+ level := metrics.GetApplicationLevel()
+ typ := event.Attachment["DirTyp"]
+ switch typ {
+ case NumAllInc:
+ rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumAll, level)).Inc()
+ case NumAllDec:
+ rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumAll, level)).Add(-1)
+ case NumDisableTotal:
+ rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumDisable, level)).Inc()
+ case NumToReconnectTotal:
+ rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumToReconnect, level)).Inc()
+ case NumValidTotal:
+ rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumValid, level)).Inc()
+ default:
+ }
+
+}
+
+// serverRegHandler handles server register metrics
+func (rc *registryCollector) serverRegHandler(event *RegistryMetricsEvent) {
+ // Event is converted to metrics
+ // Save metrics to the MetricRegistry
+ m := metrics.ComputeIfAbsentCache(dubboRegServerNum, func() interface{} {
+ return newStatesMetricFunc(ServiceRegisterMetricRequests, ServiceRegisterMetricRequestsSucceed, ServiceRegisterMetricRequestsFailed, metrics.GetApplicationLevel(), rc.regRegistry)
+ }).(metrics.StatesMetrics)
+ m.Inc(event.Succ)
+ metric := metrics.ComputeIfAbsentCache(dubboRegServerRt, func() interface{} {
+ return newTimeMetrics(RegisterServiceRtMillisecondsMin, RegisterServiceRtMillisecondsMax, RegisterServiceRtMillisecondsAvg, RegisterServiceRtMillisecondsSum, RegisterServiceRtMillisecondsLast, metrics.GetApplicationLevel(), rc.regRegistry)
+ }).(metrics.TimeMetric)
+ metric.Record(event.CostMs())
+}
+
+// serverSubHandler handles server subscribe metrics
+func (rc *registryCollector) serverSubHandler(event *RegistryMetricsEvent) {
+ // Event is converted to metrics
+ // Save metrics to the MetricRegistry
+ m := newStatesMetricFunc(ServiceSubscribeMetricNum, ServiceSubscribeMetricNumSucceed, ServiceSubscribeMetricNumFailed, metrics.GetApplicationLevel(), rc.regRegistry)
+ m.Inc(event.Succ)
+}
diff --git a/metrics/registry/event.go b/metrics/registry/event.go
new file mode 100644
index 0000000..87b135a
--- /dev/null
+++ b/metrics/registry/event.go
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package registry
+
+import (
+ "time"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+)
+
+// RegistryMetricsEvent contains info about register metrics
+type RegistryMetricsEvent struct {
+ Name MetricName
+ Succ bool
+ Start time.Time
+ End time.Time
+ Attachment map[string]string
+}
+
+func (r RegistryMetricsEvent) Type() string {
+ return constant.MetricsRegistry
+}
+
+func (r *RegistryMetricsEvent) CostMs() float64 {
+ return float64(r.End.Sub(r.Start)) / float64(time.Millisecond)
+}
+
+// NewRegisterEvent for register metrics
+func NewRegisterEvent(succ bool, start time.Time) metrics.MetricsEvent {
+ return &RegistryMetricsEvent{
+ Name: Reg,
+ Succ: succ,
+ Start: start,
+ End: time.Now(),
+ }
+}
+
+// NewSubscribeEvent for subscribe metrics
+func NewSubscribeEvent(succ bool) metrics.MetricsEvent {
+ return &RegistryMetricsEvent{
+ Name: Sub,
+ Succ: succ,
+ }
+}
+
+// NewNotifyEvent for notify metrics
+func NewNotifyEvent(start time.Time) metrics.MetricsEvent {
+ return &RegistryMetricsEvent{
+ Name: Notify,
+ Start: start,
+ End: time.Now(),
+ }
+}
+
+// NewDirectoryEvent for directory metrics
+func NewDirectoryEvent(dirTyp string) metrics.MetricsEvent {
+ return &RegistryMetricsEvent{
+ Name: Directory,
+ Attachment: map[string]string{"DirTyp": dirTyp},
+ }
+}
+
+// NewServerRegisterEvent for server register metrics
+func NewServerRegisterEvent(succ bool, start time.Time) metrics.MetricsEvent {
+ return &RegistryMetricsEvent{
+ Name: ServerReg,
+ Succ: succ,
+ Start: start,
+ End: time.Now(),
+ }
+}
+
+// NewServerSubscribeEvent for server subscribe metrics
+func NewServerSubscribeEvent(succ bool) metrics.MetricsEvent {
+ return &RegistryMetricsEvent{
+ Name: ServerSub,
+ Succ: succ,
+ }
+}
diff --git a/metrics/registry/metric_set.go b/metrics/registry/metric_set.go
new file mode 100644
index 0000000..02408bd
--- /dev/null
+++ b/metrics/registry/metric_set.go
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package registry
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+)
+
+type MetricName int8
+
+const (
+ Reg MetricName = iota
+ Sub
+ Notify
+ Directory
+ ServerReg
+ ServerSub
+)
+
+const (
+ NumAllInc = "numAllInc"
+ NumAllDec = "numAllDec"
+ NumDisableTotal = "numDisableTotal"
+ NumToReconnectTotal = "numToReconnectTotal"
+ NumValidTotal = "numValidTotal"
+)
+
+const (
+ dubboRegNum = "dubbo_registry_register_metrics_num"
+ dubboRegRt = "dubbo_registry_register_metrics_rt"
+ dubboRegServerNum = "dubbo_registry_register_server_metrics_num"
+ dubboRegServerRt = "dubbo_registry_register_server_metrics_rt"
+ dubboNotifyRt = "dubbo_notify_rt"
+)
+
+var (
+ // register metrics key
+ RegisterMetricRequests = metrics.NewMetricKey("dubbo_registry_register_requests_total", "Total Register Requests")
+ RegisterMetricRequestsSucceed = metrics.NewMetricKey("dubbo_registry_register_requests_succeed_total", "Succeed Register Requests")
+ RegisterMetricRequestsFailed = metrics.NewMetricKey("dubbo_registry_register_requests_failed_total", "Failed Register Requests")
+
+ // subscribe metrics key
+ SubscribeMetricNum = metrics.NewMetricKey("dubbo_registry_subscribe_num_total", "Total Subscribe Num")
+ SubscribeMetricNumSucceed = metrics.NewMetricKey("dubbo_registry_subscribe_num_succeed_total", "Succeed Subscribe Num")
+ SubscribeMetricNumFailed = metrics.NewMetricKey("dubbo_registry_subscribe_num_failed_total", "Failed Subscribe Num")
+
+ // directory metrics key
+ DirectoryMetricNumAll = metrics.NewMetricKey("dubbo_registry_directory_num_all", "All Directory Urls")
+ DirectoryMetricNumValid = metrics.NewMetricKey("dubbo_registry_directory_num_valid_total", "Valid Directory Urls")
+ DirectoryMetricNumToReconnect = metrics.NewMetricKey("dubbo_registry_directory_num_to_reconnect_total", "ToReconnect Directory Urls")
+ DirectoryMetricNumDisable = metrics.NewMetricKey("dubbo_registry_directory_num_disable_total", "Disable Directory Urls")
+
+ NotifyMetricRequests = metrics.NewMetricKey("dubbo_registry_notify_requests_total", "Total Notify Requests")
+ NotifyMetricNumLast = metrics.NewMetricKey("dubbo_registry_notify_num_last", "Last Notify Nums")
+
+ // register service metrics key
+ ServiceRegisterMetricRequests = metrics.NewMetricKey("dubbo_registry_register_service_total", "Total Service-Level Register Requests")
+ ServiceRegisterMetricRequestsSucceed = metrics.NewMetricKey("dubbo_registry_register_service_succeed_total", "Succeed Service-Level Register Requests")
+ ServiceRegisterMetricRequestsFailed = metrics.NewMetricKey("dubbo_registry_register_service_failed_total", "Failed Service-Level Register Requests")
+
+ // subscribe metrics key
+ ServiceSubscribeMetricNum = metrics.NewMetricKey("dubbo_registry_subscribe_service_num_total", "Total Service-Level Subscribe Num")
+ ServiceSubscribeMetricNumSucceed = metrics.NewMetricKey("dubbo_registry_subscribe_service_num_succeed_total", "Succeed Service-Level Num")
+ ServiceSubscribeMetricNumFailed = metrics.NewMetricKey("dubbo_registry_subscribe_service_num_failed_total", "Failed Service-Level Num")
+
+ // register metrics server rt key
+ RegisterServiceRtMillisecondsAvg = metrics.NewMetricKey("dubbo_register_service_rt_milliseconds_avg", "Average Service Register Time")
+ RegisterServiceRtMillisecondsLast = metrics.NewMetricKey("dubbo_register_service_rt_milliseconds_last", "Last Service Register Time")
+ RegisterServiceRtMillisecondsMax = metrics.NewMetricKey("dubbo_register_service_rt_milliseconds_max", "Max Service Register Time")
+ RegisterServiceRtMillisecondsMin = metrics.NewMetricKey("dubbo_register_service_rt_milliseconds_min", "Min Service Register Time")
+ RegisterServiceRtMillisecondsSum = metrics.NewMetricKey("dubbo_register_service_rt_milliseconds_sum", "Sum Service Register Time")
+
+ // register metrics rt key
+ RegisterRtMillisecondsMax = metrics.NewMetricKey("dubbo_register_rt_milliseconds_max", "Max Response Time")
+ RegisterRtMillisecondsLast = metrics.NewMetricKey("dubbo_register_rt_milliseconds_last", "Last Response Time")
+ RegisterRtMillisecondsAvg = metrics.NewMetricKey("dubbo_register_rt_milliseconds_avg", "Average Response Time")
+ RegisterRtMillisecondsSum = metrics.NewMetricKey("dubbo_register_rt_milliseconds_sum", "Sum Response Time")
+ RegisterRtMillisecondsMin = metrics.NewMetricKey("dubbo_register_rt_milliseconds_min", "Min Response Time")
+
+ // notify rt key
+ NotifyRtMillisecondsAvg = metrics.NewMetricKey("dubbo_notify_rt_milliseconds_avg", "Average Notify Time")
+ NotifyRtMillisecondsLast = metrics.NewMetricKey("dubbo_notify_rt_milliseconds_last", "Last Notify Time")
+ NotifyRtMillisecondsMax = metrics.NewMetricKey("dubbo_notify_rt_milliseconds_max", "Max Notify Time")
+ NotifyRtMillisecondsMin = metrics.NewMetricKey("dubbo_notify_rt_milliseconds_min", "Min Notify Time")
+ NotifyRtMillisecondsSum = metrics.NewMetricKey("dubbo_notify_rt_milliseconds_sum", "Sum Notify Time")
+)
diff --git a/metrics/reporter.go b/metrics/reporter.go
index 604d412..bf9693b 100644
--- a/metrics/reporter.go
+++ b/metrics/reporter.go
@@ -36,6 +36,7 @@
Path string
PushGatewayAddress string
SummaryMaxAge int64
+ Protocol string // MetricsRegistry 扩展配置 ,如:prometheus
}
type ReportMode string
diff --git a/registry/base_registry.go b/registry/base_registry.go
index 0fa643c..a41e967 100644
--- a/registry/base_registry.go
+++ b/registry/base_registry.go
@@ -36,6 +36,8 @@
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+ metricsRegistry "dubbo.apache.org/dubbo-go/v3/metrics/registry"
)
const (
@@ -132,6 +134,7 @@
// Register implement interface registry to register
func (r *BaseRegistry) Register(url *common.URL) error {
// if developer define registry port and ip, use it first.
+ start := time.Now()
if ipToRegistry := os.Getenv(constant.DubboIpToRegistryKey); len(ipToRegistry) > 0 {
url.Ip = ipToRegistry
} else {
@@ -146,6 +149,7 @@
}
err := r.register(url)
+ defer metrics.Publish(metricsRegistry.NewRegisterEvent(err == nil, start))
if err == nil {
r.registered.Store(url.Key(), url)
@@ -161,8 +165,8 @@
if _, ok := r.registered.Load(url.Key()); !ok {
return perrors.Errorf("Service {%s} has not registered", url.Key())
}
-
err := r.unregister(url)
+ metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil))
if err == nil {
r.registered.Delete(url.Key())
} else {
diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index 7ba8ace..903fbdb 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -22,6 +22,7 @@
"net/url"
"os"
"sync"
+ "time"
)
import (
@@ -41,6 +42,8 @@
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
_ "dubbo.apache.org/dubbo-go/v3/config_center/configurator"
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+ metricsRegistry "dubbo.apache.org/dubbo-go/v3/metrics/registry"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
"dubbo.apache.org/dubbo-go/v3/registry"
@@ -97,7 +100,7 @@
if err := dir.registry.LoadSubscribeInstances(url.SubURL, dir); err != nil {
return nil, err
}
-
+ metrics.Publish(metricsRegistry.NewDirectoryEvent(metricsRegistry.NumAllInc))
return dir, nil
}
@@ -114,7 +117,9 @@
if event == nil {
return
}
+ start := time.Now()
dir.refreshInvokers(event)
+ metrics.Publish(metricsRegistry.NewNotifyEvent(start))
}
// NotifyAll notify the events that are complete Service Event List.
@@ -336,6 +341,7 @@
// uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil
func (dir *RegistryDirectory) uncacheInvoker(event *registry.ServiceEvent) []protocol.Invoker {
+ defer metrics.Publish(metricsRegistry.NewDirectoryEvent(metricsRegistry.NumDisableTotal))
if clusterID := event.Service.GetParam(constant.MeshClusterIDKey, ""); event.Service.Location == constant.MeshAnyAddrMatcher && clusterID != "" {
dir.uncacheInvokerWithClusterID(clusterID)
}
@@ -389,6 +395,7 @@
logger.Warnf("service will be added in cache invokers fail, result is null, invokers url is %+v", newUrl.String())
}
} else {
+ metrics.Publish(metricsRegistry.NewDirectoryEvent(metricsRegistry.NumValidTotal))
// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
// the old invoker.
if common.GetCompareURLEqualFunc()(newUrl, cacheInvoker.(protocol.Invoker).GetURL()) {
@@ -430,7 +437,7 @@
return true
}
}
-
+ metrics.Publish(metricsRegistry.NewDirectoryEvent(metricsRegistry.NumToReconnectTotal))
return false
}
@@ -444,6 +451,7 @@
ivk.Destroy()
}
})
+ metrics.Publish(metricsRegistry.NewDirectoryEvent(metricsRegistry.NumAllDec))
}
func (dir *RegistryDirectory) overrideUrl(targetUrl *common.URL) {
diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go
index 911d26b..bfb4575 100644
--- a/registry/nacos/registry.go
+++ b/registry/nacos/registry.go
@@ -38,6 +38,8 @@
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+ metricsRegistry "dubbo.apache.org/dubbo-go/v3/metrics/registry"
"dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting"
"dubbo.apache.org/dubbo-go/v3/remoting/nacos"
@@ -112,11 +114,13 @@
// Register will register the service @url to its nacos registry center.
func (nr *nacosRegistry) Register(url *common.URL) error {
+ start := time.Now()
serviceName := getServiceName(url)
groupName := nr.URL.GetParam(constant.NacosGroupKey, defaultGroup)
param := createRegisterParam(url, serviceName, groupName)
logger.Infof("[Nacos Registry] Registry instance with param = %+v", param)
isRegistry, err := nr.namingClient.Client().RegisterInstance(param)
+ metrics.Publish(metricsRegistry.NewRegisterEvent(err == nil && isRegistry, start))
if err != nil {
return err
}
@@ -173,6 +177,7 @@
}
listener, err := nr.subscribe(url)
+ defer metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil))
if err != nil {
if !nr.IsAvailable() {
logger.Warnf("event listener game over.")
diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go
index 0b719ee..db0986d 100644
--- a/registry/servicediscovery/service_discovery_registry.go
+++ b/registry/servicediscovery/service_discovery_registry.go
@@ -21,6 +21,7 @@
"bytes"
"strings"
"sync"
+ "time"
)
import (
@@ -39,6 +40,9 @@
"dubbo.apache.org/dubbo-go/v3/metadata/mapping"
"dubbo.apache.org/dubbo-go/v3/metadata/service"
"dubbo.apache.org/dubbo-go/v3/metadata/service/local"
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+ metricMetadata "dubbo.apache.org/dubbo-go/v3/metrics/metadata"
+ metricsRegistry "dubbo.apache.org/dubbo-go/v3/metrics/registry"
"dubbo.apache.org/dubbo-go/v3/registry"
_ "dubbo.apache.org/dubbo-go/v3/registry/event"
"dubbo.apache.org/dubbo-go/v3/registry/servicediscovery/synthesizer"
@@ -175,7 +179,10 @@
return nil
}
common.HandleRegisterIPAndPort(url)
+
+ start := time.Now()
ok, err := s.metaDataService.ExportURL(url)
+ metrics.Publish(metricsRegistry.NewServerRegisterEvent(ok && err == nil, start))
if err != nil {
logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error())
@@ -244,7 +251,13 @@
}
s.serviceListeners[serviceNamesKey] = listener
listener.AddListenerAndNotify(protocolServiceKey, notify)
+ event := metricMetadata.NewMetadataMetricTimeEvent(metricMetadata.SubscribeServiceRt)
err = s.serviceDiscovery.AddListener(listener)
+ event.Succ = err != nil
+ event.End = time.Now()
+ event.Attachment[constant.InterfaceKey] = url.Interface()
+ metrics.Publish(event)
+ metrics.Publish(metricsRegistry.NewServerSubscribeEvent(err == nil))
if err != nil {
logger.Errorf("add instance listener catch error,url:%s err:%s", url.String(), err.Error())
}