Merge branch 'main' into feature-triple
diff --git a/common/constant/key.go b/common/constant/key.go
index 2effa04..63e626f 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -413,6 +413,10 @@
// metrics key
const (
+ MetadataEnabledKey = "metrics.metadata.enabled"
+ RegistryEnabledKey = "metrics.registry.enabled"
+ ConfigCenterEnabledKey = "metrics.config-center.enabled"
+ RpcEnabledKey = "metrics.rpc.enabled"
AggregationEnabledKey = "aggregation.enabled"
AggregationBucketNumKey = "aggregation.bucket.num"
AggregationTimeWindowSecondsKey = "aggregation.time.window.seconds"
diff --git a/config/metric_config.go b/config/metric_config.go
index 0859b57..af41eb8 100644
--- a/config/metric_config.go
+++ b/config/metric_config.go
@@ -35,13 +35,17 @@
// MetricConfig This is the config struct for all metrics implementation
type MetricConfig struct {
- Enable *bool `default:"false" yaml:"enable" json:"enable,omitempty" property:"enable"`
- Port string `default:"9090" yaml:"port" json:"port,omitempty" property:"port"`
- Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"`
- Protocol string `default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
- Prometheus *PrometheusConfig `yaml:"prometheus" json:"prometheus" property:"prometheus"`
- Aggregation *AggregateConfig `yaml:"aggregation" json:"aggregation" property:"aggregation"`
- rootConfig *RootConfig
+ Enable *bool `default:"false" yaml:"enable" json:"enable,omitempty" property:"enable"`
+ Port string `default:"9090" yaml:"port" json:"port,omitempty" property:"port"`
+ Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"`
+ Protocol string `default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
+ EnableMetadata *bool `default:"true" yaml:"enable-metadata" json:"enable-metadata,omitempty" property:"enable-metadata"`
+ EnableRegistry *bool `default:"true" yaml:"enable-registry" json:"enable-registry,omitempty" property:"enable-registry"`
+ EnableConfigCenter *bool `default:"true" yaml:"enable-config-center" json:"enable-config-center,omitempty" property:"enable-config-center"`
+ EnableRpc *bool `default:"true" yaml:"enable-rpc" json:"enable-rpc,omitempty" property:"enable-rpc"`
+ Prometheus *PrometheusConfig `yaml:"prometheus" json:"prometheus" property:"prometheus"`
+ Aggregation *AggregateConfig `yaml:"aggregation" json:"aggregation" property:"aggregation"`
+ rootConfig *RootConfig
}
type AggregateConfig struct {
@@ -101,6 +105,26 @@
return &MetricConfigBuilder{metricConfig: &MetricConfig{}}
}
+func (mcb *MetricConfigBuilder) SetMetadataEnabled(enabled bool) *MetricConfigBuilder {
+ mcb.metricConfig.EnableMetadata = &enabled
+ return mcb
+}
+
+func (mcb *MetricConfigBuilder) SetRegistryEnabled(enabled bool) *MetricConfigBuilder {
+ mcb.metricConfig.EnableRegistry = &enabled
+ return mcb
+}
+
+func (mcb *MetricConfigBuilder) SetConfigCenterEnabled(enabled bool) *MetricConfigBuilder {
+ mcb.metricConfig.EnableConfigCenter = &enabled
+ return mcb
+}
+
+func (mcb *MetricConfigBuilder) SetRpcEnabled(enabled bool) *MetricConfigBuilder {
+ mcb.metricConfig.EnableRpc = &enabled
+ return mcb
+}
+
func (mcb *MetricConfigBuilder) Build() *MetricConfig {
return mcb.metricConfig
}
@@ -113,11 +137,15 @@
// prometheus://localhost:9090?&histogram.enabled=false&prometheus.exporter.enabled=false
func (mc *MetricConfig) toURL() *common.URL {
url, _ := common.NewURL("localhost", common.WithProtocol(mc.Protocol))
- url.SetParam(constant.PrometheusExporterEnabledKey, strconv.FormatBool(*mc.Enable))
+ url.SetParam(constant.PrometheusExporterEnabledKey, strconv.FormatBool(*mc.Enable)) // for compatibility
url.SetParam(constant.PrometheusExporterMetricsPortKey, mc.Port)
url.SetParam(constant.PrometheusExporterMetricsPathKey, mc.Path)
url.SetParam(constant.ApplicationKey, mc.rootConfig.Application.Name)
url.SetParam(constant.AppVersionKey, mc.rootConfig.Application.Version)
+ url.SetParam(constant.MetadataEnabledKey, strconv.FormatBool(*mc.EnableMetadata))
+ url.SetParam(constant.RegistryEnabledKey, strconv.FormatBool(*mc.EnableRegistry))
+ url.SetParam(constant.ConfigCenterEnabledKey, strconv.FormatBool(*mc.EnableConfigCenter))
+ url.SetParam(constant.RpcEnabledKey, strconv.FormatBool(*mc.EnableRpc))
if mc.Aggregation != nil {
url.SetParam(constant.AggregationEnabledKey, strconv.FormatBool(*mc.Aggregation.Enabled))
url.SetParam(constant.AggregationBucketNumKey, strconv.Itoa(mc.Aggregation.BucketNum))
diff --git a/config/metric_config_test.go b/config/metric_config_test.go
index 70dce11..31a0ac6 100644
--- a/config/metric_config_test.go
+++ b/config/metric_config_test.go
@@ -26,9 +26,17 @@
)
func TestMetricConfigBuilder(t *testing.T) {
- config := NewMetricConfigBuilder().Build()
- err := config.Init(&RootConfig{Application: &ApplicationConfig{Name: "dubbo", Version: "1.0.0"}})
- assert.NoError(t, err)
- reporterConfig := config.ToReporterConfig()
- assert.Equal(t, string(reporterConfig.Mode), "pull")
+ config := NewMetricConfigBuilder().
+ SetConfigCenterEnabled(false).
+ SetMetadataEnabled(false).
+ SetRegistryEnabled(false).
+ SetRpcEnabled(false).
+ Build()
+ enable := false
+ assert.Equal(t, &MetricConfig{
+ EnableConfigCenter: &enable,
+ EnableMetadata: &enable,
+ EnableRegistry: &enable,
+ EnableRpc: &enable,
+ }, config)
}
diff --git a/metrics/config_center/collector.go b/metrics/config_center/collector.go
index 9ae551f..3b4bb1e 100644
--- a/metrics/config_center/collector.go
+++ b/metrics/config_center/collector.go
@@ -30,9 +30,11 @@
var info = metrics.NewMetricKey("dubbo_configcenter_total", "Config Changed Total")
func init() {
- metrics.AddCollector("config_center", func(mr metrics.MetricRegistry, _ *common.URL) {
- c := &configCenterCollector{r: mr}
- c.start()
+ metrics.AddCollector("config_center", func(mr metrics.MetricRegistry, url *common.URL) {
+ if url.GetParamBool(constant.ConfigCenterEnabledKey, true) {
+ c := &configCenterCollector{r: mr}
+ c.start()
+ }
})
}
diff --git a/metrics/metadata/collector.go b/metrics/metadata/collector.go
index 7125fb1..8a08e0f 100644
--- a/metrics/metadata/collector.go
+++ b/metrics/metadata/collector.go
@@ -32,9 +32,11 @@
var ch = make(chan metrics.MetricsEvent, 10)
func init() {
- metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, _ *common.URL) {
- l := &MetadataMetricCollector{metrics.BaseCollector{R: mr}}
- l.start()
+ metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, url *common.URL) {
+ if url.GetParamBool(constant.MetadataEnabledKey, true) {
+ l := &MetadataMetricCollector{metrics.BaseCollector{R: mr}}
+ l.start()
+ }
})
}
diff --git a/metrics/prometheus/registry.go b/metrics/prometheus/registry.go
index f84f2ad..d108f8f 100644
--- a/metrics/prometheus/registry.go
+++ b/metrics/prometheus/registry.go
@@ -143,56 +143,62 @@
func (p *promMetricRegistry) Export() {
if p.url.GetParamBool(constant.PrometheusExporterEnabledKey, false) {
- go func() {
- mux := http.NewServeMux()
- path := p.url.GetParam(constant.PrometheusDefaultMetricsPath, constant.PrometheusDefaultMetricsPath)
- port := p.url.GetParam(constant.PrometheusExporterMetricsPortKey, constant.PrometheusDefaultMetricsPort)
- mux.Handle(path, promhttp.InstrumentMetricHandler(p.r, promhttp.HandlerFor(p.gather, promhttp.HandlerOpts{})))
- srv := &http.Server{Addr: ":" + port, Handler: mux}
- extension.AddCustomShutdownCallback(func() {
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- if err := srv.Shutdown(ctx); nil != err {
- logger.Fatalf("prometheus server shutdown failed, err: %v", err)
- } else {
- logger.Info("prometheus server gracefully shutdown success")
- }
- })
- logger.Infof("prometheus endpoint :%s%s", port, path)
- if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { // except Shutdown or Close
- logger.Errorf("new prometheus server with error = %v", err)
- }
- }()
+ go p.exportHttp()
}
if p.url.GetParamBool(constant.PrometheusPushgatewayEnabledKey, false) {
- baseUrl, exist := p.url.GetNonDefaultParam(constant.PrometheusPushgatewayBaseUrlKey)
- if !exist {
- logger.Error("no pushgateway url found in config path: metrics.prometheus.pushgateway.bash-url, please check your config file")
- return
- }
- username := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
- password := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
- job := p.url.GetParam(constant.PrometheusPushgatewayJobKey, constant.PrometheusDefaultJobName)
- pushInterval := p.url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey, constant.PrometheusDefaultPushInterval)
- pusher := push.New(baseUrl, job).Gatherer(p.gather)
- if len(username) != 0 {
- pusher.BasicAuth(username, password)
- }
- logger.Infof("prometheus pushgateway will push to %s every %d seconds", baseUrl, pushInterval)
- ticker := time.NewTicker(time.Duration(pushInterval) * time.Second)
- go func() {
- for range ticker.C {
- err := pusher.Add()
- if err != nil {
- logger.Errorf("push metric data to prometheus push gateway error", err)
- } else {
- logger.Debugf("prometheus pushgateway push to %s success", baseUrl)
- }
- }
- }()
+ p.exportPushgateway()
}
}
+func (p *promMetricRegistry) exportHttp() {
+ mux := http.NewServeMux()
+ path := p.url.GetParam(constant.PrometheusDefaultMetricsPath, constant.PrometheusDefaultMetricsPath)
+ port := p.url.GetParam(constant.PrometheusExporterMetricsPortKey, constant.PrometheusDefaultMetricsPort)
+ mux.Handle(path, promhttp.InstrumentMetricHandler(p.r, promhttp.HandlerFor(p.gather, promhttp.HandlerOpts{})))
+ srv := &http.Server{Addr: ":" + port, Handler: mux}
+ extension.AddCustomShutdownCallback(func() {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := srv.Shutdown(ctx); nil != err {
+ logger.Fatalf("prometheus server shutdown failed, err: %v", err)
+ } else {
+ logger.Info("prometheus server gracefully shutdown success")
+ }
+ })
+ logger.Infof("prometheus endpoint :%s%s", port, path)
+ if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { // except Shutdown or Close
+ logger.Errorf("new prometheus server with error: %v", err)
+ }
+}
+
+func (p *promMetricRegistry) exportPushgateway() {
+ baseUrl, exist := p.url.GetNonDefaultParam(constant.PrometheusPushgatewayBaseUrlKey)
+ if !exist {
+ logger.Error("no pushgateway base url found in config path: metrics.prometheus.pushgateway.base-url, please check your config")
+ return
+ }
+ username := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
+ password := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
+ job := p.url.GetParam(constant.PrometheusPushgatewayJobKey, constant.PrometheusDefaultJobName)
+ pushInterval := p.url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey, constant.PrometheusDefaultPushInterval)
+ pusher := push.New(baseUrl, job).Gatherer(p.gather)
+ if len(username) != 0 {
+ pusher.BasicAuth(username, password)
+ }
+ logger.Infof("prometheus pushgateway will push to %s every %d seconds", baseUrl, pushInterval)
+ ticker := time.NewTicker(time.Duration(pushInterval) * time.Second)
+ go func() {
+ for range ticker.C {
+ err := pusher.Add()
+ if err != nil {
+ logger.Errorf("push metric data to prometheus pushgateway error: %v", err)
+ } else {
+ logger.Debugf("prometheus pushgateway push to %s success", baseUrl)
+ }
+ }
+ }()
+}
+
func (p *promMetricRegistry) Scrape() (string, error) {
gathering, err := p.gather.Gather()
if err != nil {
diff --git a/metrics/registry/collector.go b/metrics/registry/collector.go
index 53a5d71..871dd46 100644
--- a/metrics/registry/collector.go
+++ b/metrics/registry/collector.go
@@ -28,9 +28,11 @@
)
func init() {
- metrics.AddCollector("registry", func(m metrics.MetricRegistry, _ *common.URL) {
- rc := ®istryCollector{metrics.BaseCollector{R: m}}
- go rc.start()
+ metrics.AddCollector("registry", func(m metrics.MetricRegistry, url *common.URL) {
+ if url.GetParamBool(constant.RegistryEnabledKey, true) {
+ rc := ®istryCollector{metrics.BaseCollector{R: m}}
+ go rc.start()
+ }
})
}
diff --git a/metrics/rpc/collector.go b/metrics/rpc/collector.go
index dc9fb53..0888138 100644
--- a/metrics/rpc/collector.go
+++ b/metrics/rpc/collector.go
@@ -33,12 +33,14 @@
// init will add the rpc collectorFunc to metrics.collectors slice, and lazy start the rpc collector goroutine
func init() {
- collectorFunc := func(registry metrics.MetricRegistry, c *common.URL) {
- rc := &rpcCollector{
- registry: registry,
- metricSet: buildMetricSet(registry),
+ collectorFunc := func(registry metrics.MetricRegistry, url *common.URL) {
+ if url.GetParamBool(constant.RpcEnabledKey, true) {
+ rc := &rpcCollector{
+ registry: registry,
+ metricSet: buildMetricSet(registry),
+ }
+ go rc.start()
}
- go rc.start()
}
metrics.AddCollector("rpc", collectorFunc)
@@ -93,6 +95,9 @@
if event.result != nil {
if event.result.Error() == nil {
c.incRequestsSucceedTotal(role, labels)
+ } else {
+ // TODO: Breaking down RPC exceptions further
+ c.incRequestsFailedTotal(role, labels)
}
}
c.reportRTMilliseconds(role, labels, event.costTime.Milliseconds())
@@ -147,6 +152,17 @@
}
}
+func (c *rpcCollector) incRequestsFailedTotal(role string, labels map[string]string) {
+ switch role {
+ case constant.SideProvider:
+ c.metricSet.provider.requestsFailedTotal.Inc(labels)
+ c.metricSet.provider.requestsFailedTotalAggregate.Inc(labels)
+ case constant.SideConsumer:
+ c.metricSet.consumer.requestsFailedTotal.Inc(labels)
+ c.metricSet.consumer.requestsFailedTotalAggregate.Inc(labels)
+ }
+}
+
func (c *rpcCollector) reportRTMilliseconds(role string, labels map[string]string, cost int64) {
switch role {
case constant.SideProvider:
diff --git a/metrics/rpc/metric_set.go b/metrics/rpc/metric_set.go
index a27d439..aa34946 100644
--- a/metrics/rpc/metric_set.go
+++ b/metrics/rpc/metric_set.go
@@ -43,6 +43,8 @@
requestsProcessingTotal metrics.GaugeVec
requestsSucceedTotal metrics.CounterVec
requestsSucceedTotalAggregate metrics.AggregateCounterVec
+ requestsFailedTotal metrics.CounterVec
+ requestsFailedTotalAggregate metrics.AggregateCounterVec
rtMilliseconds metrics.RtVec
rtMillisecondsQuantiles metrics.QuantileMetricVec
rtMillisecondsAggregate metrics.RtVec
@@ -66,12 +68,14 @@
pm.requestsProcessingTotal = metrics.NewGaugeVec(registry, metrics.NewMetricKey("dubbo_provider_requests_processing_total", "The number of received requests being processed by the provider"))
pm.requestsSucceedTotal = metrics.NewCounterVec(registry, metrics.NewMetricKey("dubbo_provider_requests_succeed_total", "The number of requests successfully received by the provider"))
pm.requestsSucceedTotalAggregate = metrics.NewAggregateCounterVec(registry, metrics.NewMetricKey("dubbo_provider_requests_succeed_total_aggregate", "The number of successful requests received by the provider under the sliding window"))
+ pm.requestsFailedTotal = metrics.NewCounterVec(registry, metrics.NewMetricKey("dubbo_provider_requests_failed_total", "Total Failed Requests"))
+ pm.requestsFailedTotalAggregate = metrics.NewAggregateCounterVec(registry, metrics.NewMetricKey("dubbo_provider_requests_failed_total_aggregate", "Total Failed Aggregate Requests"))
pm.rtMilliseconds = metrics.NewRtVec(registry,
metrics.NewMetricKey("dubbo_provider_rt_milliseconds", "response time among all requests processed by the provider"),
&metrics.RtOpts{Aggregate: false},
)
pm.rtMillisecondsAggregate = metrics.NewRtVec(registry,
- metrics.NewMetricKey("dubbo_provider_rt_milliseconds", "response time of the provider under the sliding window"),
+ metrics.NewMetricKey("dubbo_provider_rt", "response time of the provider under the sliding window"),
&metrics.RtOpts{Aggregate: true, BucketNum: metrics.DefaultBucketNum, TimeWindowSeconds: metrics.DefaultTimeWindowSeconds},
)
pm.rtMillisecondsQuantiles = metrics.NewQuantileMetricVec(registry, []*metrics.MetricKey{
@@ -89,12 +93,14 @@
cm.requestsProcessingTotal = metrics.NewGaugeVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_processing_total", "The number of received requests being processed by the consumer"))
cm.requestsSucceedTotal = metrics.NewCounterVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_succeed_total", "The number of successful requests sent by consumers"))
cm.requestsSucceedTotalAggregate = metrics.NewAggregateCounterVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_succeed_total_aggregate", "The number of successful requests sent by consumers under the sliding window"))
+ cm.requestsFailedTotal = metrics.NewCounterVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_failed_total", "Total Failed Requests"))
+ cm.requestsFailedTotalAggregate = metrics.NewAggregateCounterVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_failed_total_aggregate", "Total Failed Aggregate Requests"))
cm.rtMilliseconds = metrics.NewRtVec(registry,
metrics.NewMetricKey("dubbo_consumer_rt_milliseconds", "response time among all requests from consumers"),
&metrics.RtOpts{Aggregate: false},
)
cm.rtMillisecondsAggregate = metrics.NewRtVec(registry,
- metrics.NewMetricKey("dubbo_consumer_rt_milliseconds", "response time of the consumer under the sliding window"),
+ metrics.NewMetricKey("dubbo_consumer_rt", "response time of the consumer under the sliding window"),
&metrics.RtOpts{Aggregate: true, BucketNum: metrics.DefaultBucketNum, TimeWindowSeconds: metrics.DefaultTimeWindowSeconds},
)
cm.rtMillisecondsQuantiles = metrics.NewQuantileMetricVec(registry, []*metrics.MetricKey{
diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go
index 5a5d7be..915f10b 100644
--- a/registry/zookeeper/service_discovery.go
+++ b/registry/zookeeper/service_discovery.go
@@ -64,9 +64,14 @@
// newZookeeperServiceDiscovery the constructor of newZookeeperServiceDiscovery
func newZookeeperServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
+ group := url.GetParam(constant.RegistryGroupKey, rootPath)
+ if !strings.HasPrefix(group, constant.PathSeparator) {
+ group = constant.PathSeparator + group
+ }
+
zksd := &zookeeperServiceDiscovery{
url: url,
- rootPath: rootPath,
+ rootPath: group,
instanceListenerMap: make(map[string]*gxset.HashSet),
}
if err := zookeeper.ValidateZookeeperClient(zksd, url.Location); err != nil {
@@ -74,7 +79,7 @@
}
zksd.WaitGroup().Add(1) // zk client start successful, then wg +1
go zookeeper.HandleClientRestart(zksd)
- zksd.csd = curator_discovery.NewServiceDiscovery(zksd.client, rootPath)
+ zksd.csd = curator_discovery.NewServiceDiscovery(zksd.client, group)
return zksd, nil
}