blob: 581139fa5f8e00f8030bb24a3d7bd58b8cd87f10 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package counter
import (
"fmt"
"k8s.io/client-go/tools/cache"
"github.com/apache/dubbo-admin/pkg/core/events"
"github.com/apache/dubbo-admin/pkg/core/logger"
meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
)
type CounterType string
type FieldExtractor func(resmodel.Resource) string
const (
ProtocolCounter CounterType = "protocol"
ReleaseCounter CounterType = "release"
DiscoveryCounter CounterType = "discovery"
)
type CounterManager interface {
RegisterSimpleCounter(kind resmodel.ResourceKind)
RegisterDistributionCounter(kind resmodel.ResourceKind, metric CounterType, extractor FieldExtractor)
Count(kind resmodel.ResourceKind) int64
Distribution(metric CounterType) map[string]int64
CountByMesh(kind resmodel.ResourceKind, mesh string) int64
DistributionByMesh(metric CounterType, mesh string) map[string]int64
Reset()
Bind(bus events.EventBus) error
}
type distributionCounterConfig struct {
counterType CounterType
counter *DistributionCounter
extractor func(resmodel.Resource) string
}
type counterManager struct {
simpleCounters map[resmodel.ResourceKind]*Counter
distributionConfigs map[resmodel.ResourceKind][]*distributionCounterConfig
distributionByType map[CounterType]*DistributionCounter
}
func NewCounterManager() CounterManager {
return newCounterManager()
}
func newCounterManager() *counterManager {
cm := &counterManager{
simpleCounters: make(map[resmodel.ResourceKind]*Counter),
distributionConfigs: make(map[resmodel.ResourceKind][]*distributionCounterConfig),
distributionByType: make(map[CounterType]*DistributionCounter),
}
cm.RegisterSimpleCounter(meshresource.ApplicationKind)
cm.RegisterSimpleCounter(meshresource.ServiceProviderMetadataKind)
cm.RegisterSimpleCounter(meshresource.InstanceKind)
cm.RegisterDistributionCounter(meshresource.InstanceKind, ProtocolCounter, instanceProtocolKey)
cm.RegisterDistributionCounter(meshresource.InstanceKind, ReleaseCounter, instanceReleaseKey)
cm.RegisterDistributionCounter(meshresource.InstanceKind, DiscoveryCounter, instanceMeshKey)
return cm
}
func (cm *counterManager) RegisterSimpleCounter(kind resmodel.ResourceKind) {
if kind == "" {
return
}
if _, exists := cm.simpleCounters[kind]; exists {
return
}
cm.simpleCounters[kind] = NewCounter(string(kind))
}
func (cm *counterManager) RegisterDistributionCounter(kind resmodel.ResourceKind, metric CounterType, extractor FieldExtractor) {
if kind == "" || metric == "" {
return
}
counter := cm.distributionByType[metric]
if counter == nil {
counter = NewDistributionCounter(string(metric))
cm.distributionByType[metric] = counter
}
configs := cm.distributionConfigs[kind]
for _, cfg := range configs {
if cfg.counterType == metric {
cfg.counter = counter
cfg.extractor = extractor
return
}
}
cm.distributionConfigs[kind] = append(configs, &distributionCounterConfig{
counterType: metric,
counter: counter,
extractor: extractor,
})
}
func (cm *counterManager) Reset() {
for _, counter := range cm.simpleCounters {
counter.Reset()
}
for _, counter := range cm.distributionByType {
counter.Reset()
}
}
func (cm *counterManager) Count(kind resmodel.ResourceKind) int64 {
if counter, exists := cm.simpleCounters[kind]; exists {
return counter.Get()
}
return 0
}
func (cm *counterManager) Distribution(metric CounterType) map[string]int64 {
counter, exists := cm.distributionByType[metric]
if !exists {
return map[string]int64{}
}
return counter.GetAll()
}
func (cm *counterManager) CountByMesh(kind resmodel.ResourceKind, mesh string) int64 {
if mesh == "" {
mesh = "default"
}
if counter, exists := cm.simpleCounters[kind]; exists {
return counter.GetByGroup(mesh)
}
return 0
}
func (cm *counterManager) DistributionByMesh(metric CounterType, mesh string) map[string]int64 {
if mesh == "" {
mesh = "default"
}
counter, exists := cm.distributionByType[metric]
if !exists {
return map[string]int64{}
}
return counter.GetByGroup(mesh)
}
func (cm *counterManager) Bind(bus events.EventBus) error {
handledKinds := make(map[resmodel.ResourceKind]struct{})
for kind := range cm.simpleCounters {
handledKinds[kind] = struct{}{}
}
for kind := range cm.distributionConfigs {
handledKinds[kind] = struct{}{}
}
for kind := range handledKinds {
resourceKind := kind
name := fmt.Sprintf("counter-manager/%s", resourceKind)
subscriber := &counterEventSubscriber{
kind: resourceKind,
name: name,
handler: func(event events.Event) error {
return cm.handleEvent(resourceKind, event)
},
}
if err := bus.Subscribe(subscriber); err != nil {
return err
}
logger.Infof("CounterManager subscribed to %s events", resourceKind)
}
logger.Infof("CounterManager bound to EventBus successfully")
return nil
}
func (cm *counterManager) handleEvent(kind resmodel.ResourceKind, event events.Event) error {
logger.Debugf("CounterManager handling %s event, type: %s", kind, event.Type())
if counter := cm.simpleCounters[kind]; counter != nil {
processSimpleCounter(counter, event)
}
if configs := cm.distributionConfigs[kind]; len(configs) > 0 {
for _, cfg := range configs {
processDistributionCounter(cfg, event)
}
}
return nil
}
func (cm *counterManager) getDistributionConfig(kind resmodel.ResourceKind, metric CounterType) *distributionCounterConfig {
configs := cm.distributionConfigs[kind]
for _, cfg := range configs {
if cfg.counterType == metric {
return cfg
}
}
return nil
}
func extractMeshName(res resmodel.Resource) string {
if res == nil {
return "default"
}
mesh := res.ResourceMesh()
if mesh == "" {
return "default"
}
return mesh
}
func processSimpleCounter(counter *Counter, event events.Event) {
mesh := extractMeshName(event.NewObj())
if event.NewObj() == nil {
mesh = extractMeshName(event.OldObj())
}
switch event.Type() {
case cache.Added:
counter.Increment(mesh)
logger.Debugf("CounterManager: Increment %s for mesh=%s, current count=%d", counter.name, mesh, counter.GetByGroup(mesh))
case cache.Sync, cache.Replaced:
if isNewResourceEvent(event) {
counter.Increment(mesh)
logger.Debugf("CounterManager: Increment %s for mesh=%s (Sync/Replaced), current count=%d", counter.name, mesh, counter.GetByGroup(mesh))
}
case cache.Deleted:
counter.Decrement(mesh)
logger.Debugf("CounterManager: Decrement %s for mesh=%s, current count=%d", counter.name, mesh, counter.GetByGroup(mesh))
case cache.Updated:
default:
}
}
func processDistributionCounter(cfg *distributionCounterConfig, event events.Event) {
mesh := extractMeshName(event.NewObj())
if event.NewObj() == nil {
mesh = extractMeshName(event.OldObj())
}
switch event.Type() {
case cache.Added:
key := cfg.extractFrom(event.NewObj())
cfg.counter.Increment(mesh, normalizeDistributionKey(key))
case cache.Sync, cache.Replaced:
if isNewResourceEvent(event) {
key := cfg.extractFrom(event.NewObj())
cfg.counter.Increment(mesh, normalizeDistributionKey(key))
} else {
cfg.update(event.OldObj(), event.NewObj())
}
case cache.Updated:
cfg.update(event.OldObj(), event.NewObj())
case cache.Deleted:
key := cfg.extractFrom(event.OldObj())
cfg.counter.Decrement(mesh, normalizeDistributionKey(key))
default:
}
}
func (cfg *distributionCounterConfig) extractFrom(res resmodel.Resource) string {
if cfg.extractor == nil || res == nil {
return ""
}
return cfg.extractor(res)
}
func (cfg *distributionCounterConfig) update(oldObj, newObj resmodel.Resource) {
oldKey := normalizeDistributionKey(cfg.extractFrom(oldObj))
newKey := normalizeDistributionKey(cfg.extractFrom(newObj))
oldMesh := extractMeshName(oldObj)
newMesh := extractMeshName(newObj)
if oldKey == newKey && oldMesh == newMesh {
return
}
if oldObj != nil {
cfg.counter.Decrement(oldMesh, oldKey)
}
if newObj != nil {
cfg.counter.Increment(newMesh, newKey)
}
}
func instanceProtocolKey(res resmodel.Resource) string {
instance, ok := res.(*meshresource.InstanceResource)
if !ok || instance == nil || instance.Spec == nil {
return ""
}
return instance.Spec.GetProtocol()
}
func instanceReleaseKey(res resmodel.Resource) string {
instance, ok := res.(*meshresource.InstanceResource)
if !ok || instance == nil || instance.Spec == nil {
return ""
}
return instance.Spec.GetReleaseVersion()
}
func instanceMeshKey(res resmodel.Resource) string {
instance, ok := res.(*meshresource.InstanceResource)
if !ok || instance == nil {
return ""
}
return instance.Mesh
}
func normalizeDistributionKey(key string) string {
if key == "" {
return "unknown"
}
return key
}
func isNewResourceEvent(event events.Event) bool {
if event == nil {
return false
}
return event.OldObj() == nil
}
type counterEventSubscriber struct {
kind resmodel.ResourceKind
name string
handler func(events.Event) error
}
func (s *counterEventSubscriber) ResourceKind() resmodel.ResourceKind {
return s.kind
}
func (s *counterEventSubscriber) Name() string {
return s.name
}
func (s *counterEventSubscriber) ProcessEvent(event events.Event) error {
if s.handler == nil {
return nil
}
return s.handler(event)
}