Merge branch 'main' into feature-triple
diff --git a/common/constant/key.go b/common/constant/key.go
index 2effa04..63e626f 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -413,6 +413,10 @@
 
 // metrics key
 const (
+	MetadataEnabledKey                   = "metrics.metadata.enabled"
+	RegistryEnabledKey                   = "metrics.registry.enabled"
+	ConfigCenterEnabledKey               = "metrics.config-center.enabled"
+	RpcEnabledKey                        = "metrics.rpc.enabled"
 	AggregationEnabledKey                = "aggregation.enabled"
 	AggregationBucketNumKey              = "aggregation.bucket.num"
 	AggregationTimeWindowSecondsKey      = "aggregation.time.window.seconds"
diff --git a/config/metric_config.go b/config/metric_config.go
index 0859b57..af41eb8 100644
--- a/config/metric_config.go
+++ b/config/metric_config.go
@@ -35,13 +35,17 @@
 
 // MetricConfig This is the config struct for all metrics implementation
 type MetricConfig struct {
-	Enable      *bool             `default:"false" yaml:"enable" json:"enable,omitempty" property:"enable"`
-	Port        string            `default:"9090" yaml:"port" json:"port,omitempty" property:"port"`
-	Path        string            `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"`
-	Protocol    string            `default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
-	Prometheus  *PrometheusConfig `yaml:"prometheus" json:"prometheus" property:"prometheus"`
-	Aggregation *AggregateConfig  `yaml:"aggregation" json:"aggregation" property:"aggregation"`
-	rootConfig  *RootConfig
+	Enable             *bool             `default:"false" yaml:"enable" json:"enable,omitempty" property:"enable"`
+	Port               string            `default:"9090" yaml:"port" json:"port,omitempty" property:"port"`
+	Path               string            `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"`
+	Protocol           string            `default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
+	EnableMetadata     *bool             `default:"true" yaml:"enable-metadata" json:"enable-metadata,omitempty" property:"enable-metadata"`
+	EnableRegistry     *bool             `default:"true" yaml:"enable-registry" json:"enable-registry,omitempty" property:"enable-registry"`
+	EnableConfigCenter *bool             `default:"true" yaml:"enable-config-center" json:"enable-config-center,omitempty" property:"enable-config-center"`
+	EnableRpc          *bool             `default:"true" yaml:"enable-rpc" json:"enable-rpc,omitempty" property:"enable-rpc"`
+	Prometheus         *PrometheusConfig `yaml:"prometheus" json:"prometheus" property:"prometheus"`
+	Aggregation        *AggregateConfig  `yaml:"aggregation" json:"aggregation" property:"aggregation"`
+	rootConfig         *RootConfig
 }
 
 type AggregateConfig struct {
@@ -101,6 +105,26 @@
 	return &MetricConfigBuilder{metricConfig: &MetricConfig{}}
 }
 
+func (mcb *MetricConfigBuilder) SetMetadataEnabled(enabled bool) *MetricConfigBuilder {
+	mcb.metricConfig.EnableMetadata = &enabled
+	return mcb
+}
+
+func (mcb *MetricConfigBuilder) SetRegistryEnabled(enabled bool) *MetricConfigBuilder {
+	mcb.metricConfig.EnableRegistry = &enabled
+	return mcb
+}
+
+func (mcb *MetricConfigBuilder) SetConfigCenterEnabled(enabled bool) *MetricConfigBuilder {
+	mcb.metricConfig.EnableConfigCenter = &enabled
+	return mcb
+}
+
+func (mcb *MetricConfigBuilder) SetRpcEnabled(enabled bool) *MetricConfigBuilder {
+	mcb.metricConfig.EnableRpc = &enabled
+	return mcb
+}
+
 func (mcb *MetricConfigBuilder) Build() *MetricConfig {
 	return mcb.metricConfig
 }
@@ -113,11 +137,15 @@
 // prometheus://localhost:9090?&histogram.enabled=false&prometheus.exporter.enabled=false
 func (mc *MetricConfig) toURL() *common.URL {
 	url, _ := common.NewURL("localhost", common.WithProtocol(mc.Protocol))
-	url.SetParam(constant.PrometheusExporterEnabledKey, strconv.FormatBool(*mc.Enable))
+	url.SetParam(constant.PrometheusExporterEnabledKey, strconv.FormatBool(*mc.Enable)) // for compatibility
 	url.SetParam(constant.PrometheusExporterMetricsPortKey, mc.Port)
 	url.SetParam(constant.PrometheusExporterMetricsPathKey, mc.Path)
 	url.SetParam(constant.ApplicationKey, mc.rootConfig.Application.Name)
 	url.SetParam(constant.AppVersionKey, mc.rootConfig.Application.Version)
+	url.SetParam(constant.MetadataEnabledKey, strconv.FormatBool(*mc.EnableMetadata))
+	url.SetParam(constant.RegistryEnabledKey, strconv.FormatBool(*mc.EnableRegistry))
+	url.SetParam(constant.ConfigCenterEnabledKey, strconv.FormatBool(*mc.EnableConfigCenter))
+	url.SetParam(constant.RpcEnabledKey, strconv.FormatBool(*mc.EnableRpc))
 	if mc.Aggregation != nil {
 		url.SetParam(constant.AggregationEnabledKey, strconv.FormatBool(*mc.Aggregation.Enabled))
 		url.SetParam(constant.AggregationBucketNumKey, strconv.Itoa(mc.Aggregation.BucketNum))
diff --git a/config/metric_config_test.go b/config/metric_config_test.go
index 70dce11..31a0ac6 100644
--- a/config/metric_config_test.go
+++ b/config/metric_config_test.go
@@ -26,9 +26,17 @@
 )
 
 func TestMetricConfigBuilder(t *testing.T) {
-	config := NewMetricConfigBuilder().Build()
-	err := config.Init(&RootConfig{Application: &ApplicationConfig{Name: "dubbo", Version: "1.0.0"}})
-	assert.NoError(t, err)
-	reporterConfig := config.ToReporterConfig()
-	assert.Equal(t, string(reporterConfig.Mode), "pull")
+	config := NewMetricConfigBuilder().
+		SetConfigCenterEnabled(false).
+		SetMetadataEnabled(false).
+		SetRegistryEnabled(false).
+		SetRpcEnabled(false).
+		Build()
+	enable := false
+	assert.Equal(t, &MetricConfig{
+		EnableConfigCenter: &enable,
+		EnableMetadata:     &enable,
+		EnableRegistry:     &enable,
+		EnableRpc:          &enable,
+	}, config)
 }
diff --git a/metrics/config_center/collector.go b/metrics/config_center/collector.go
index 9ae551f..3b4bb1e 100644
--- a/metrics/config_center/collector.go
+++ b/metrics/config_center/collector.go
@@ -30,9 +30,11 @@
 var info = metrics.NewMetricKey("dubbo_configcenter_total", "Config Changed Total")
 
 func init() {
-	metrics.AddCollector("config_center", func(mr metrics.MetricRegistry, _ *common.URL) {
-		c := &configCenterCollector{r: mr}
-		c.start()
+	metrics.AddCollector("config_center", func(mr metrics.MetricRegistry, url *common.URL) {
+		if url.GetParamBool(constant.ConfigCenterEnabledKey, true) {
+			c := &configCenterCollector{r: mr}
+			c.start()
+		}
 	})
 }
 
diff --git a/metrics/metadata/collector.go b/metrics/metadata/collector.go
index 7125fb1..8a08e0f 100644
--- a/metrics/metadata/collector.go
+++ b/metrics/metadata/collector.go
@@ -32,9 +32,11 @@
 var ch = make(chan metrics.MetricsEvent, 10)
 
 func init() {
-	metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, _ *common.URL) {
-		l := &MetadataMetricCollector{metrics.BaseCollector{R: mr}}
-		l.start()
+	metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, url *common.URL) {
+		if url.GetParamBool(constant.MetadataEnabledKey, true) {
+			l := &MetadataMetricCollector{metrics.BaseCollector{R: mr}}
+			l.start()
+		}
 	})
 }
 
diff --git a/metrics/prometheus/registry.go b/metrics/prometheus/registry.go
index f84f2ad..d108f8f 100644
--- a/metrics/prometheus/registry.go
+++ b/metrics/prometheus/registry.go
@@ -143,56 +143,62 @@
 
 func (p *promMetricRegistry) Export() {
 	if p.url.GetParamBool(constant.PrometheusExporterEnabledKey, false) {
-		go func() {
-			mux := http.NewServeMux()
-			path := p.url.GetParam(constant.PrometheusDefaultMetricsPath, constant.PrometheusDefaultMetricsPath)
-			port := p.url.GetParam(constant.PrometheusExporterMetricsPortKey, constant.PrometheusDefaultMetricsPort)
-			mux.Handle(path, promhttp.InstrumentMetricHandler(p.r, promhttp.HandlerFor(p.gather, promhttp.HandlerOpts{})))
-			srv := &http.Server{Addr: ":" + port, Handler: mux}
-			extension.AddCustomShutdownCallback(func() {
-				ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-				defer cancel()
-				if err := srv.Shutdown(ctx); nil != err {
-					logger.Fatalf("prometheus server shutdown failed, err: %v", err)
-				} else {
-					logger.Info("prometheus server gracefully shutdown success")
-				}
-			})
-			logger.Infof("prometheus endpoint :%s%s", port, path)
-			if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { // except Shutdown or Close
-				logger.Errorf("new prometheus server with error = %v", err)
-			}
-		}()
+		go p.exportHttp()
 	}
 	if p.url.GetParamBool(constant.PrometheusPushgatewayEnabledKey, false) {
-		baseUrl, exist := p.url.GetNonDefaultParam(constant.PrometheusPushgatewayBaseUrlKey)
-		if !exist {
-			logger.Error("no pushgateway url found in config path: metrics.prometheus.pushgateway.bash-url, please check your config file")
-			return
-		}
-		username := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
-		password := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
-		job := p.url.GetParam(constant.PrometheusPushgatewayJobKey, constant.PrometheusDefaultJobName)
-		pushInterval := p.url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey, constant.PrometheusDefaultPushInterval)
-		pusher := push.New(baseUrl, job).Gatherer(p.gather)
-		if len(username) != 0 {
-			pusher.BasicAuth(username, password)
-		}
-		logger.Infof("prometheus pushgateway will push to %s every %d seconds", baseUrl, pushInterval)
-		ticker := time.NewTicker(time.Duration(pushInterval) * time.Second)
-		go func() {
-			for range ticker.C {
-				err := pusher.Add()
-				if err != nil {
-					logger.Errorf("push metric data to prometheus push gateway error", err)
-				} else {
-					logger.Debugf("prometheus pushgateway push to %s success", baseUrl)
-				}
-			}
-		}()
+		p.exportPushgateway()
 	}
 }
 
+func (p *promMetricRegistry) exportHttp() {
+	mux := http.NewServeMux()
+	path := p.url.GetParam(constant.PrometheusDefaultMetricsPath, constant.PrometheusDefaultMetricsPath)
+	port := p.url.GetParam(constant.PrometheusExporterMetricsPortKey, constant.PrometheusDefaultMetricsPort)
+	mux.Handle(path, promhttp.InstrumentMetricHandler(p.r, promhttp.HandlerFor(p.gather, promhttp.HandlerOpts{})))
+	srv := &http.Server{Addr: ":" + port, Handler: mux}
+	extension.AddCustomShutdownCallback(func() {
+		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+		defer cancel()
+		if err := srv.Shutdown(ctx); nil != err {
+			logger.Fatalf("prometheus server shutdown failed, err: %v", err)
+		} else {
+			logger.Info("prometheus server gracefully shutdown success")
+		}
+	})
+	logger.Infof("prometheus endpoint :%s%s", port, path)
+	if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { // except Shutdown or Close
+		logger.Errorf("new prometheus server with error: %v", err)
+	}
+}
+
+func (p *promMetricRegistry) exportPushgateway() {
+	baseUrl, exist := p.url.GetNonDefaultParam(constant.PrometheusPushgatewayBaseUrlKey)
+	if !exist {
+		logger.Error("no pushgateway base url found in config path: metrics.prometheus.pushgateway.base-url, please check your config")
+		return
+	}
+	username := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
+	password := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
+	job := p.url.GetParam(constant.PrometheusPushgatewayJobKey, constant.PrometheusDefaultJobName)
+	pushInterval := p.url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey, constant.PrometheusDefaultPushInterval)
+	pusher := push.New(baseUrl, job).Gatherer(p.gather)
+	if len(username) != 0 {
+		pusher.BasicAuth(username, password)
+	}
+	logger.Infof("prometheus pushgateway will push to %s every %d seconds", baseUrl, pushInterval)
+	ticker := time.NewTicker(time.Duration(pushInterval) * time.Second)
+	go func() {
+		for range ticker.C {
+			err := pusher.Add()
+			if err != nil {
+				logger.Errorf("push metric data to prometheus pushgateway error: %v", err)
+			} else {
+				logger.Debugf("prometheus pushgateway push to %s success", baseUrl)
+			}
+		}
+	}()
+}
+
 func (p *promMetricRegistry) Scrape() (string, error) {
 	gathering, err := p.gather.Gather()
 	if err != nil {
diff --git a/metrics/registry/collector.go b/metrics/registry/collector.go
index 53a5d71..871dd46 100644
--- a/metrics/registry/collector.go
+++ b/metrics/registry/collector.go
@@ -28,9 +28,11 @@
 )
 
 func init() {
-	metrics.AddCollector("registry", func(m metrics.MetricRegistry, _ *common.URL) {
-		rc := &registryCollector{metrics.BaseCollector{R: m}}
-		go rc.start()
+	metrics.AddCollector("registry", func(m metrics.MetricRegistry, url *common.URL) {
+		if url.GetParamBool(constant.RegistryEnabledKey, true) {
+			rc := &registryCollector{metrics.BaseCollector{R: m}}
+			go rc.start()
+		}
 	})
 }
 
diff --git a/metrics/rpc/collector.go b/metrics/rpc/collector.go
index dc9fb53..0888138 100644
--- a/metrics/rpc/collector.go
+++ b/metrics/rpc/collector.go
@@ -33,12 +33,14 @@
 
 // init will add the rpc collectorFunc to metrics.collectors slice, and lazy start the rpc collector goroutine
 func init() {
-	collectorFunc := func(registry metrics.MetricRegistry, c *common.URL) {
-		rc := &rpcCollector{
-			registry:  registry,
-			metricSet: buildMetricSet(registry),
+	collectorFunc := func(registry metrics.MetricRegistry, url *common.URL) {
+		if url.GetParamBool(constant.RpcEnabledKey, true) {
+			rc := &rpcCollector{
+				registry:  registry,
+				metricSet: buildMetricSet(registry),
+			}
+			go rc.start()
 		}
-		go rc.start()
 	}
 
 	metrics.AddCollector("rpc", collectorFunc)
@@ -93,6 +95,9 @@
 	if event.result != nil {
 		if event.result.Error() == nil {
 			c.incRequestsSucceedTotal(role, labels)
+		} else {
+			// TODO: Breaking down RPC exceptions further
+			c.incRequestsFailedTotal(role, labels)
 		}
 	}
 	c.reportRTMilliseconds(role, labels, event.costTime.Milliseconds())
@@ -147,6 +152,17 @@
 	}
 }
 
+func (c *rpcCollector) incRequestsFailedTotal(role string, labels map[string]string) {
+	switch role {
+	case constant.SideProvider:
+		c.metricSet.provider.requestsFailedTotal.Inc(labels)
+		c.metricSet.provider.requestsFailedTotalAggregate.Inc(labels)
+	case constant.SideConsumer:
+		c.metricSet.consumer.requestsFailedTotal.Inc(labels)
+		c.metricSet.consumer.requestsFailedTotalAggregate.Inc(labels)
+	}
+}
+
 func (c *rpcCollector) reportRTMilliseconds(role string, labels map[string]string, cost int64) {
 	switch role {
 	case constant.SideProvider:
diff --git a/metrics/rpc/metric_set.go b/metrics/rpc/metric_set.go
index a27d439..aa34946 100644
--- a/metrics/rpc/metric_set.go
+++ b/metrics/rpc/metric_set.go
@@ -43,6 +43,8 @@
 	requestsProcessingTotal       metrics.GaugeVec
 	requestsSucceedTotal          metrics.CounterVec
 	requestsSucceedTotalAggregate metrics.AggregateCounterVec
+	requestsFailedTotal           metrics.CounterVec
+	requestsFailedTotalAggregate  metrics.AggregateCounterVec
 	rtMilliseconds                metrics.RtVec
 	rtMillisecondsQuantiles       metrics.QuantileMetricVec
 	rtMillisecondsAggregate       metrics.RtVec
@@ -66,12 +68,14 @@
 	pm.requestsProcessingTotal = metrics.NewGaugeVec(registry, metrics.NewMetricKey("dubbo_provider_requests_processing_total", "The number of received requests being processed by the provider"))
 	pm.requestsSucceedTotal = metrics.NewCounterVec(registry, metrics.NewMetricKey("dubbo_provider_requests_succeed_total", "The number of requests successfully received by the provider"))
 	pm.requestsSucceedTotalAggregate = metrics.NewAggregateCounterVec(registry, metrics.NewMetricKey("dubbo_provider_requests_succeed_total_aggregate", "The number of successful requests received by the provider under the sliding window"))
+	pm.requestsFailedTotal = metrics.NewCounterVec(registry, metrics.NewMetricKey("dubbo_provider_requests_failed_total", "Total Failed Requests"))
+	pm.requestsFailedTotalAggregate = metrics.NewAggregateCounterVec(registry, metrics.NewMetricKey("dubbo_provider_requests_failed_total_aggregate", "Total Failed Aggregate Requests"))
 	pm.rtMilliseconds = metrics.NewRtVec(registry,
 		metrics.NewMetricKey("dubbo_provider_rt_milliseconds", "response time among all requests processed by the provider"),
 		&metrics.RtOpts{Aggregate: false},
 	)
 	pm.rtMillisecondsAggregate = metrics.NewRtVec(registry,
-		metrics.NewMetricKey("dubbo_provider_rt_milliseconds", "response time of the provider under the sliding window"),
+		metrics.NewMetricKey("dubbo_provider_rt", "response time of the provider under the sliding window"),
 		&metrics.RtOpts{Aggregate: true, BucketNum: metrics.DefaultBucketNum, TimeWindowSeconds: metrics.DefaultTimeWindowSeconds},
 	)
 	pm.rtMillisecondsQuantiles = metrics.NewQuantileMetricVec(registry, []*metrics.MetricKey{
@@ -89,12 +93,14 @@
 	cm.requestsProcessingTotal = metrics.NewGaugeVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_processing_total", "The number of received requests being processed by the consumer"))
 	cm.requestsSucceedTotal = metrics.NewCounterVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_succeed_total", "The number of successful requests sent by consumers"))
 	cm.requestsSucceedTotalAggregate = metrics.NewAggregateCounterVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_succeed_total_aggregate", "The number of successful requests sent by consumers under the sliding window"))
+	cm.requestsFailedTotal = metrics.NewCounterVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_failed_total", "Total Failed Requests"))
+	cm.requestsFailedTotalAggregate = metrics.NewAggregateCounterVec(registry, metrics.NewMetricKey("dubbo_consumer_requests_failed_total_aggregate", "Total Failed Aggregate Requests"))
 	cm.rtMilliseconds = metrics.NewRtVec(registry,
 		metrics.NewMetricKey("dubbo_consumer_rt_milliseconds", "response time among all requests from consumers"),
 		&metrics.RtOpts{Aggregate: false},
 	)
 	cm.rtMillisecondsAggregate = metrics.NewRtVec(registry,
-		metrics.NewMetricKey("dubbo_consumer_rt_milliseconds", "response time of the consumer under the sliding window"),
+		metrics.NewMetricKey("dubbo_consumer_rt", "response time of the consumer under the sliding window"),
 		&metrics.RtOpts{Aggregate: true, BucketNum: metrics.DefaultBucketNum, TimeWindowSeconds: metrics.DefaultTimeWindowSeconds},
 	)
 	cm.rtMillisecondsQuantiles = metrics.NewQuantileMetricVec(registry, []*metrics.MetricKey{
diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go
index 5a5d7be..915f10b 100644
--- a/registry/zookeeper/service_discovery.go
+++ b/registry/zookeeper/service_discovery.go
@@ -64,9 +64,14 @@
 
 // newZookeeperServiceDiscovery the constructor of newZookeeperServiceDiscovery
 func newZookeeperServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
+	group := url.GetParam(constant.RegistryGroupKey, rootPath)
+	if !strings.HasPrefix(group, constant.PathSeparator) {
+		group = constant.PathSeparator + group
+	}
+
 	zksd := &zookeeperServiceDiscovery{
 		url:                 url,
-		rootPath:            rootPath,
+		rootPath:            group,
 		instanceListenerMap: make(map[string]*gxset.HashSet),
 	}
 	if err := zookeeper.ValidateZookeeperClient(zksd, url.Location); err != nil {
@@ -74,7 +79,7 @@
 	}
 	zksd.WaitGroup().Add(1) // zk client start successful, then wg +1
 	go zookeeper.HandleClientRestart(zksd)
-	zksd.csd = curator_discovery.NewServiceDiscovery(zksd.client, rootPath)
+	zksd.csd = curator_discovery.NewServiceDiscovery(zksd.client, group)
 	return zksd, nil
 }