blob: 63840ed7de56088c853feedf04c1aaa14ab8b7db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package registry
import (
"time"
)
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/metrics"
)
var (
registryChan = make(chan metrics.MetricsEvent, 128)
)
func init() {
metrics.AddCollector("registry", func(m metrics.MetricRegistry, c *metrics.ReporterConfig) {
rc := &registryCollector{regRegistry: m}
go rc.start()
})
}
// registryCollector is the registry's metrics collector
type registryCollector struct {
regRegistry metrics.MetricRegistry
}
func (rc *registryCollector) start() {
metrics.Subscribe(constant.MetricsRegistry, registryChan)
for event := range registryChan {
if registryEvent, ok := event.(*RegistryMetricsEvent); ok {
switch registryEvent.Name {
case Reg:
rc.regHandler(registryEvent)
case Sub:
rc.subHandler(registryEvent)
case Notify:
rc.notifyHandler(registryEvent)
case ServerReg:
rc.serverRegHandler(registryEvent)
case ServerSub:
rc.serverSubHandler(registryEvent)
default:
}
}
}
}
func newStatesMetricFunc(total *metrics.MetricKey, succ *metrics.MetricKey, fail *metrics.MetricKey,
level metrics.MetricLevel, reg metrics.MetricRegistry) metrics.StatesMetrics {
return metrics.NewStatesMetrics(metrics.NewMetricId(total, level), metrics.NewMetricId(succ, level),
metrics.NewMetricId(fail, level), reg)
}
func newTimeMetrics(min, max, avg, sum, last *metrics.MetricKey, level metrics.MetricLevel, mr metrics.MetricRegistry) metrics.TimeMetric {
return metrics.NewTimeMetric(metrics.NewMetricId(min, level), metrics.NewMetricId(max, level), metrics.NewMetricId(avg, level),
metrics.NewMetricId(sum, level), metrics.NewMetricId(last, level), mr)
}
// regHandler handles register metrics
func (rc *registryCollector) regHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics
// Save metrics to the MetricRegistry
m := metrics.ComputeIfAbsentCache(dubboRegNum, func() interface{} {
return newStatesMetricFunc(RegisterMetricRequests, RegisterMetricRequestsSucceed, RegisterMetricRequestsFailed, metrics.GetApplicationLevel(), rc.regRegistry)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
metric := metrics.ComputeIfAbsentCache(dubboRegRt, func() interface{} {
return newTimeMetrics(RegisterRtMillisecondsMin, RegisterRtMillisecondsMax, RegisterRtMillisecondsAvg, RegisterRtMillisecondsSum, RegisterRtMillisecondsLast, metrics.GetApplicationLevel(), rc.regRegistry)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}
// subHandler handles subscribe metrics
func (rc *registryCollector) subHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics
// Save metrics to the MetricRegistry
m := newStatesMetricFunc(SubscribeMetricNum, SubscribeMetricNumSucceed, SubscribeMetricNumFailed, metrics.GetApplicationLevel(), rc.regRegistry)
m.Inc(event.Succ)
}
// notifyHandler handles notify metrics
func (rc *registryCollector) notifyHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics
// Save metrics to the MetricRegistry
rc.regRegistry.Counter(metrics.NewMetricId(NotifyMetricRequests, metrics.GetApplicationLevel())).Inc()
rc.regRegistry.Histogram(metrics.NewMetricId(NotifyMetricNumLast, metrics.GetApplicationLevel())).Observe(float64(event.End.UnixNano()) / float64(time.Second))
metric := metrics.ComputeIfAbsentCache(dubboNotifyRt, func() interface{} {
return newTimeMetrics(NotifyRtMillisecondsMin, NotifyRtMillisecondsMax, NotifyRtMillisecondsAvg, NotifyRtMillisecondsSum, NotifyRtMillisecondsLast, metrics.GetApplicationLevel(), rc.regRegistry)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}
// directoryHandler handles directory metrics
func (rc *registryCollector) directoryHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics
// Save metrics to the MetricRegistry
level := metrics.GetApplicationLevel()
typ := event.Attachment["DirTyp"]
switch typ {
case NumAllInc:
rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumAll, level)).Inc()
case NumAllDec:
rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumAll, level)).Add(-1)
case NumDisableTotal:
rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumDisable, level)).Inc()
case NumToReconnectTotal:
rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumToReconnect, level)).Inc()
case NumValidTotal:
rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumValid, level)).Inc()
default:
}
}
// serverRegHandler handles server register metrics
func (rc *registryCollector) serverRegHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics
// Save metrics to the MetricRegistry
m := metrics.ComputeIfAbsentCache(dubboRegServerNum, func() interface{} {
return newStatesMetricFunc(ServiceRegisterMetricRequests, ServiceRegisterMetricRequestsSucceed, ServiceRegisterMetricRequestsFailed, metrics.GetApplicationLevel(), rc.regRegistry)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
metric := metrics.ComputeIfAbsentCache(dubboRegServerRt, func() interface{} {
return newTimeMetrics(RegisterServiceRtMillisecondsMin, RegisterServiceRtMillisecondsMax, RegisterServiceRtMillisecondsAvg, RegisterServiceRtMillisecondsSum, RegisterServiceRtMillisecondsLast, metrics.GetApplicationLevel(), rc.regRegistry)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}
// serverSubHandler handles server subscribe metrics
func (rc *registryCollector) serverSubHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics
// Save metrics to the MetricRegistry
m := newStatesMetricFunc(ServiceSubscribeMetricNum, ServiceSubscribeMetricNumSucceed, ServiceSubscribeMetricNumFailed, metrics.GetApplicationLevel(), rc.regRegistry)
m.Inc(event.Succ)
}