blob: 5208fb4f1cf665f2ba0476effb02f088c1600df5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package universal
import (
"sync"
"dubbo.apache.org/dubbo-go/v3/common"
"github.com/apache/dubbo-kubernetes/pkg/admin/cache"
"github.com/apache/dubbo-kubernetes/pkg/admin/cache/selector"
"github.com/apache/dubbo-kubernetes/pkg/admin/constant"
"github.com/apache/dubbo-kubernetes/pkg/admin/model"
"github.com/apache/dubbo-kubernetes/pkg/admin/util"
)
var UniversalCacheInstance *UniversalCache
func NewUniversalCache() *UniversalCache {
return &UniversalCache{
providers: &cacheMap{
data: make(map[string]map[string]map[string]*DubboModel),
},
consumers: &cacheMap{
data: make(map[string]map[string]map[string]*DubboModel),
},
idCache: sync.Map{},
}
}
type UniversalCache struct {
providers *cacheMap
consumers *cacheMap
idCache sync.Map
}
// cacheMap is a cache container for dubbo provider or consumer, before reading or writing its data, you need to lock it.
type cacheMap struct {
data map[string]map[string]map[string]*DubboModel // application -> serviceKey -> serviceId -> model
lock sync.RWMutex
}
// set is used to store a DubboModel in cacheMap
func (cm *cacheMap) set(applicationName, serviceKey, serviceId string, model *DubboModel) {
cm.lock.Lock()
defer cm.lock.Unlock()
applicationMap := cm.data
if _, ok := applicationMap[applicationName]; !ok {
applicationMap[applicationName] = map[string]map[string]*DubboModel{}
}
serviceMap := applicationMap[applicationName]
if _, ok := serviceMap[serviceKey]; !ok {
serviceMap[serviceKey] = map[string]*DubboModel{}
}
instanceMap := serviceMap[serviceKey]
instanceMap[serviceId] = model
}
// get is used to get a DubboModel from cacheMap
func (cm *cacheMap) get(applicationName, serviceKey, serviceId string) (actual *DubboModel, ok bool) {
cm.lock.RLock()
defer cm.lock.RUnlock()
applicationMap := cm.data
if serviceMap, ok := applicationMap[applicationName]; !ok {
return nil, false
} else {
if instanceMap, ok := serviceMap[serviceKey]; !ok {
return nil, false
} else {
return instanceMap[serviceId], true
}
}
}
// delete is used to delete a DubboModel from cacheMap
func (cm *cacheMap) delete(applicationName, serviceKey, serviceId string) {
cm.lock.Lock()
defer cm.lock.Unlock()
applicationMap := cm.data
if serviceMap, ok := applicationMap[applicationName]; ok {
if instanceMap, ok := serviceMap[serviceKey]; ok {
delete(instanceMap, serviceId)
}
}
}
// GetApplications returns all applications in the registry.
func (uc *UniversalCache) GetApplications(namespace string) ([]*cache.ApplicationModel, error) {
applicationSet := map[string]struct{}{} // it's used to deduplicate
uc.providers.lock.RLock()
for name := range uc.providers.data {
applicationSet[name] = struct{}{}
}
uc.providers.lock.RUnlock()
uc.consumers.lock.RLock()
for name := range uc.consumers.data {
applicationSet[name] = struct{}{}
}
uc.consumers.lock.RUnlock()
applications := make([]*cache.ApplicationModel, 0, len(applicationSet))
for name := range applicationSet {
applications = append(applications, &cache.ApplicationModel{Name: name})
}
return applications, nil
}
func (uc *UniversalCache) GetWorkloads(namespace string) ([]*cache.WorkloadModel, error) {
return []*cache.WorkloadModel{}, nil
}
func (uc *UniversalCache) GetWorkloadsWithSelector(namespace string, selector selector.Selector) ([]*cache.WorkloadModel, error) {
return []*cache.WorkloadModel{}, nil
}
// GetInstances returns all instances in the registry.
//
// An instance is a URL record in the registry, and the key of instance is IP + Port.
func (uc *UniversalCache) GetInstances(namespace string) ([]*cache.InstanceModel, error) {
res := make([]*cache.InstanceModel, 0)
instanceSet := map[string]struct{}{} // it's used to deduplicate
uc.providers.lock.RLock()
for application, serviceMap := range uc.providers.data {
for serviceKey, instanceMap := range serviceMap {
for _, dubboModel := range instanceMap {
if _, ok := instanceSet[dubboModel.Ip+":"+dubboModel.Port]; ok {
continue
} else {
instanceSet[dubboModel.Ip+":"+dubboModel.Port] = struct{}{}
res = append(res, &cache.InstanceModel{
Application: &cache.ApplicationModel{Name: application},
Workload: nil,
Name: serviceKey + "#" + dubboModel.Ip + ":" + dubboModel.Port,
Ip: dubboModel.Ip,
Port: dubboModel.Port,
Status: "",
Node: "",
Labels: nil,
})
}
}
}
}
uc.providers.lock.RUnlock()
uc.consumers.lock.RLock()
for application, serviceMap := range uc.consumers.data {
for serviceKey, instanceMap := range serviceMap {
for _, dubboModel := range instanceMap {
if _, ok := instanceSet[dubboModel.Ip+":"+dubboModel.Port]; ok {
continue
} else {
instanceSet[dubboModel.Ip+":"+dubboModel.Port] = struct{}{}
res = append(res, &cache.InstanceModel{
Application: &cache.ApplicationModel{Name: application},
Workload: nil,
Name: serviceKey + "#" + dubboModel.Ip + ":" + dubboModel.Port,
Ip: dubboModel.Ip,
Port: dubboModel.Port,
Status: "",
Node: "",
Labels: nil,
})
}
}
}
}
uc.consumers.lock.RUnlock()
return res, nil
}
func (uc *UniversalCache) GetInstancesWithSelector(namespace string, selector selector.Selector) ([]*cache.InstanceModel, error) {
res := make([]*cache.InstanceModel, 0)
instanceSet := map[string]struct{}{}
uc.providers.lock.RLock()
for application, serviceMap := range uc.providers.data {
if !selectByApplication(selector, application) {
continue
}
for serviceKey, instanceMap := range serviceMap {
if !selectByServiceKey(selector, serviceKey) {
continue
}
for _, dubboModel := range instanceMap {
if _, ok := instanceSet[dubboModel.Ip+":"+dubboModel.Port]; ok {
continue
} else {
instanceSet[dubboModel.Ip+":"+dubboModel.Port] = struct{}{}
res = append(res, &cache.InstanceModel{
Application: &cache.ApplicationModel{Name: application},
Workload: nil,
Name: serviceKey + "#" + dubboModel.Ip + ":" + dubboModel.Port,
Ip: dubboModel.Ip,
Port: dubboModel.Port,
Status: "",
Node: "",
Labels: nil,
})
}
}
}
}
uc.providers.lock.RUnlock()
uc.consumers.lock.RLock()
for application, serviceMap := range uc.consumers.data {
if !selectByApplication(selector, application) {
continue
}
for serviceKey, instanceMap := range serviceMap {
if !selectByServiceKey(selector, serviceKey) {
continue
}
for _, dubboModel := range instanceMap {
if _, ok := instanceSet[dubboModel.Ip+":"+dubboModel.Port]; ok {
continue
} else {
instanceSet[dubboModel.Ip+":"+dubboModel.Port] = struct{}{}
res = append(res, &cache.InstanceModel{
Application: &cache.ApplicationModel{Name: application},
Workload: nil,
Name: serviceKey + "#" + dubboModel.Ip + ":" + dubboModel.Port,
Ip: dubboModel.Ip,
Port: dubboModel.Port,
Status: "",
Node: "",
Labels: nil,
})
}
}
}
}
uc.consumers.lock.RUnlock()
return res, nil
}
func (uc *UniversalCache) GetServices(namespace string) ([]*cache.ServiceModel, error) {
res := make([]*cache.ServiceModel, 0)
uc.providers.lock.RLock()
for application, serviceMap := range uc.providers.data {
for serviceKey := range serviceMap {
res = append(res, &cache.ServiceModel{
Application: &cache.ApplicationModel{Name: application},
Category: constant.ProviderSide,
Name: util.GetInterface(serviceKey),
Labels: nil,
ServiceKey: serviceKey,
Group: util.GetGroup(serviceKey),
Version: util.GetVersion(serviceKey),
})
}
}
uc.providers.lock.RUnlock()
uc.consumers.lock.RLock()
for application, serviceMap := range uc.consumers.data {
for serviceKey := range serviceMap {
res = append(res, &cache.ServiceModel{
Application: &cache.ApplicationModel{Name: application},
Category: constant.ConsumerSide,
Name: util.GetInterface(serviceKey),
Labels: nil,
ServiceKey: serviceKey,
Group: util.GetGroup(serviceKey),
Version: util.GetVersion(serviceKey),
})
}
}
uc.consumers.lock.RUnlock()
return res, nil
}
func (uc *UniversalCache) GetServicesWithSelector(namespace string, selector selector.Selector) ([]*cache.ServiceModel, error) {
res := make([]*cache.ServiceModel, 0)
uc.providers.lock.RLock()
for application, serviceMap := range uc.providers.data {
if !selectByApplication(selector, application) {
continue
}
for serviceKey := range serviceMap {
if !selectByServiceKey(selector, serviceKey) {
continue
}
res = append(res, &cache.ServiceModel{
Application: &cache.ApplicationModel{Name: application},
Category: constant.ProviderSide,
Name: util.GetInterface(serviceKey),
Labels: nil,
ServiceKey: serviceKey,
Group: util.GetGroup(serviceKey),
Version: util.GetVersion(serviceKey),
})
}
}
uc.providers.lock.RUnlock()
uc.consumers.lock.RLock()
for application, serviceMap := range uc.consumers.data {
if !selectByApplication(selector, application) {
continue
}
for serviceKey := range serviceMap {
if !selectByServiceKey(selector, serviceKey) {
continue
}
res = append(res, &cache.ServiceModel{
Application: &cache.ApplicationModel{Name: application},
Category: constant.ConsumerSide,
Name: util.GetInterface(serviceKey),
Labels: nil,
ServiceKey: serviceKey,
Group: util.GetGroup(serviceKey),
Version: util.GetVersion(serviceKey),
})
}
}
uc.consumers.lock.RUnlock()
return res, nil
}
func (uc *UniversalCache) getId(key string) string {
id, _ := uc.idCache.LoadOrStore(key, util.Md5_16bit(key))
return id.(string)
}
func (uc *UniversalCache) store(url *common.URL) {
if url == nil {
return
}
category := url.GetParam(constant.CategoryKey, "")
application := url.GetParam(constant.ApplicationKey, "")
serviceKey := url.ServiceKey()
serviceId := uc.getId(url.Key())
dubboModel := &DubboModel{}
switch category {
case constant.ProvidersCategory:
provider := &model.Provider{}
provider.InitByUrl(serviceId, url)
dubboModel.InitByProvider(provider, url)
uc.providers.set(application, serviceKey, serviceId, dubboModel)
case constant.ConsumersCategory:
consumer := &model.Consumer{}
consumer.InitByUrl(serviceId, url)
dubboModel.InitByConsumer(consumer, url)
uc.consumers.set(application, serviceKey, serviceId, dubboModel)
default:
return
}
}
func (uc *UniversalCache) delete(url *common.URL) {
if url == nil {
return
}
category := url.GetParam(constant.CategoryKey, "")
application := url.GetParam(constant.ApplicationKey, "")
serviceKey := url.ServiceKey()
serviceId := uc.getId(url.Key())
var targetCache *cacheMap
switch category {
case constant.ProvidersCategory:
targetCache = uc.providers
case constant.ConsumersCategory:
targetCache = uc.consumers
}
// used to determine whether to delete or change the registry type
isDeleteOrChangeRegisterType := func(actual *DubboModel, deleteRegistryType string) bool {
if actual.RegistryType == deleteRegistryType {
return true
}
if actual.RegistryType == constant.RegistryAll {
actual.ToggleRegistryType(deleteRegistryType)
}
return false
}
registryType := url.GetParam(constant.RegistryType, constant.RegistryInstance)
group := url.Group()
version := url.Version()
if group != constant.AnyValue && version != constant.AnyValue {
// delete by serviceKey and serviceId
if actual, ok := targetCache.get(application, serviceKey, serviceId); ok {
if isDeleteOrChangeRegisterType(actual, registryType) {
targetCache.delete(application, serviceKey, serviceId)
}
}
} else {
// support delete by wildcard search
wildcardDeleteFunc := func(serviceMap map[string]map[string]*DubboModel) {
for serviceKey := range serviceMap {
if util.GetInterface(serviceKey) == url.Service() &&
(group == constant.AnyValue || group == util.GetGroup(serviceKey)) &&
(version == constant.AnyValue || version == util.GetVersion(serviceKey)) {
deleteIds := make([]string, 0)
for id, m := range serviceMap[serviceKey] {
if isDeleteOrChangeRegisterType(m, registryType) {
deleteIds = append(deleteIds, id)
}
}
for _, id := range deleteIds {
targetCache.delete(application, serviceKey, id)
}
}
}
}
if application != "" {
serviceMap := targetCache.data[application]
wildcardDeleteFunc(serviceMap)
} else {
for _, serviceMap := range targetCache.data {
wildcardDeleteFunc(serviceMap)
}
}
}
}
// DubboModel is a dubbo provider or consumer in registry
type DubboModel struct {
Application string
Category string // provider or consumer
ServiceKey string // service key
Group string
Version string
Protocol string
Ip string
Port string
RegistryType string
Provider *model.Provider
Consumer *model.Consumer
}
func (m *DubboModel) InitByProvider(provider *model.Provider, url *common.URL) {
m.Provider = provider
m.Category = constant.ProviderSide
m.ServiceKey = provider.Service
m.Group = url.Group()
m.Version = url.Version()
m.Application = provider.Application
m.Protocol = url.Protocol
m.Ip = url.Ip
m.Port = url.Port
m.RegistryType = url.GetParam(constant.RegistryType, constant.RegistryInstance)
}
func (m *DubboModel) InitByConsumer(consumer *model.Consumer, url *common.URL) {
m.Consumer = consumer
m.Category = constant.ConsumerSide
m.ServiceKey = consumer.Service
m.Group = url.Group()
m.Version = url.Version()
m.Application = consumer.Application
m.Protocol = url.Protocol
m.Ip = url.Ip
m.Port = url.Port
m.RegistryType = url.GetParam(constant.RegistryType, constant.RegistryInstance)
}
func (m *DubboModel) ToggleRegistryType(deleteType string) {
if m.RegistryType != constant.RegistryAll {
return
}
if deleteType == constant.RegistryInstance {
m.RegistryType = constant.RegistryInterface
} else {
m.RegistryType = constant.RegistryInstance
}
}
// selectByServiceKey is used to determine whether the serviceKey matches the selector
func selectByServiceKey(selector selector.Selector, serviceKey string) bool {
if serviceNameOptions, ok := selector.ServiceNameOptions(); ok && !serviceNameOptions.Exist(util.GetInterface(serviceKey)) {
return false
}
if serviceGroupOptions, ok := selector.ServiceGroupOptions(); ok && !serviceGroupOptions.Exist(util.GetGroup(serviceKey)) {
return false
}
if serviceVersionOptions, ok := selector.ServiceVersionOptions(); ok && !serviceVersionOptions.Exist(util.GetVersion(serviceKey)) {
return false
}
return true
}
// selectByApplication is used to determine whether the application matches the selector
func selectByApplication(selector selector.Selector, application string) bool {
if applicationOptions, ok := selector.ApplicationOptions(); ok && !applicationOptions.Exist(application) {
return false
}
return true
}