blob: c07b3aea00daa0fe8c40cb880d99f86e29b27f82 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package traditional
import (
"context"
"fmt"
"strings"
"sync"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
dubboconstant "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config_center"
dubbo_identifier "dubbo.apache.org/dubbo-go/v3/metadata/identifier"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry"
"github.com/pkg/errors"
"golang.org/x/exp/maps"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/core/consts"
"github.com/apache/dubbo-kubernetes/pkg/core/governance"
"github.com/apache/dubbo-kubernetes/pkg/core/logger"
"github.com/apache/dubbo-kubernetes/pkg/core/reg_client"
"github.com/apache/dubbo-kubernetes/pkg/core/registry"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
"github.com/apache/dubbo-kubernetes/pkg/events"
util_k8s "github.com/apache/dubbo-kubernetes/pkg/util/k8s"
)
const (
dubboGroup = "dubbo"
mappingGroup = "mapping"
dubboConfig = "config"
metadataGroup = "metadata"
cpGroup = "dubbo-cp"
pathSeparator = "/"
)
type traditionalStore struct {
configCenter config_center.DynamicConfiguration
metadataReport report.MetadataReport
registryCenter dubboRegistry.Registry
governance governance.GovernanceConfig
appContext *registry.ApplicationContext
infContext *registry.InterfaceContext
dCache *sync.Map
regClient reg_client.RegClient
eventWriter events.Emitter
mu sync.RWMutex
}
func NewStore(
configCenter config_center.DynamicConfiguration,
metadataReport report.MetadataReport,
registryCenter dubboRegistry.Registry,
governance governance.GovernanceConfig,
dCache *sync.Map,
regClient reg_client.RegClient,
appContext *registry.ApplicationContext,
infContext *registry.InterfaceContext,
) store.ResourceStore {
return &traditionalStore{
configCenter: configCenter,
metadataReport: metadataReport,
registryCenter: registryCenter,
governance: governance,
dCache: dCache,
regClient: regClient,
appContext: appContext,
infContext: infContext,
}
}
func (t *traditionalStore) SetEventWriter(writer events.Emitter) {
t.mu.Lock()
defer t.mu.Unlock()
t.eventWriter = writer
}
func (t *traditionalStore) Create(ctx context.Context, resource core_model.Resource, fs ...store.CreateOptionsFunc) error {
var err error
opts := store.NewCreateOptions(fs...)
if opts.Name == core_model.DefaultMesh {
opts.Name += ".universal"
}
name, _, err := util_k8s.CoreNameToK8sName(opts.Name)
if err != nil {
return err
}
switch resource.Descriptor().Name {
case mesh.MappingType:
spec := resource.GetSpec()
mapping := spec.(*mesh_proto.Mapping)
appNames := mapping.ApplicationNames
serviceInterface := mapping.InterfaceName
for _, app := range appNames {
err = t.metadataReport.RegisterServiceAppMapping(serviceInterface, mappingGroup, app)
if err != nil {
return err
}
}
case mesh.MetaDataType:
spec := resource.GetSpec()
metadata := spec.(*mesh_proto.MetaData)
identifier := &dubbo_identifier.SubscriberMetadataIdentifier{
Revision: metadata.GetRevision(),
BaseApplicationMetadataIdentifier: dubbo_identifier.BaseApplicationMetadataIdentifier{
Application: metadata.GetApp(),
Group: dubboGroup,
},
}
services := map[string]*common.ServiceInfo{}
// 把metadata赋值到services中
for key, serviceInfo := range metadata.GetServices() {
services[key] = &common.ServiceInfo{
Name: serviceInfo.GetName(),
Group: serviceInfo.GetGroup(),
Version: serviceInfo.GetVersion(),
Protocol: serviceInfo.GetProtocol(),
Path: serviceInfo.GetPath(),
Params: serviceInfo.GetParams(),
}
}
info := &common.MetadataInfo{
App: metadata.GetApp(),
Revision: metadata.GetRevision(),
Services: services,
}
err = t.metadataReport.PublishAppMetadata(identifier, info)
if err != nil {
return err
}
case mesh.DataplaneType:
// Dataplane无法Create, 只能Get和List
case mesh.TagRouteType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
key := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetRoutePath(key, consts.TagRoute)
bytes, err := core_model.ToYAML(resource.GetSpec())
if err != nil {
return err
}
cfg, _ := t.governance.GetConfig(path)
if cfg != "" {
return fmt.Errorf("%s Config is exsited ", path)
}
err = t.governance.SetConfig(path, string(bytes))
if err != nil {
return err
}
case mesh.ConditionRouteType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
key := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetRoutePath(key, consts.ConditionRoute)
bytes, err := resource.GetSpec().(*mesh_proto.ConditionRoute).ToYAML()
if err != nil {
return err
}
cfg, _ := t.governance.GetConfig(path)
if cfg != "" {
return fmt.Errorf("%s Config is exsited ", path)
}
err = t.governance.SetConfig(path, string(bytes))
if err != nil {
return err
}
case mesh.DynamicConfigType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
key := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetOverridePath(key)
bytes, err := core_model.ToYAML(resource.GetSpec())
if err != nil {
return err
}
cfg, _ := t.governance.GetConfig(path)
if cfg != "" {
return fmt.Errorf("%s Config is exsited ", path)
}
err = t.governance.SetConfig(path, string(bytes))
if err != nil {
return err
}
case mesh.AffinityRouteType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
key := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetRoutePath(key, consts.AffinityRoute)
bytes, err := core_model.ToYAML(resource.GetSpec())
if err != nil {
return err
}
cfg, _ := t.governance.GetConfig(path)
if cfg != "" {
return fmt.Errorf("%s Config is exsited ", path)
}
err = t.governance.SetConfig(path, string(bytes))
if err != nil {
return err
}
default:
bytes, err := core_model.ToYAML(resource.GetSpec())
if err != nil {
return err
}
path := GenerateCpGroupPath(string(resource.Descriptor().Name), name)
// 使用RegClient
err = t.regClient.SetContent(path, bytes)
if err != nil {
return err
}
}
resource.SetMeta(&resourceMetaObject{
Name: name,
Mesh: opts.Mesh,
CreationTime: opts.CreationTime,
ModificationTime: opts.CreationTime,
Labels: maps.Clone(opts.Labels),
})
if t.eventWriter != nil {
go func() {
t.eventWriter.Send(events.ResourceChangedEvent{
Operation: events.Create,
Type: resource.Descriptor().Name,
Key: core_model.MetaToResourceKey(&resourceMetaObject{
Name: name,
Mesh: opts.Mesh,
}),
})
}()
}
return nil
}
func (t *traditionalStore) Update(ctx context.Context, resource core_model.Resource, fs ...store.UpdateOptionsFunc) error {
opts := store.NewUpdateOptions(fs...)
if opts.Name == core_model.DefaultMesh {
opts.Name += ".universal"
}
name, _, err := util_k8s.CoreNameToK8sName(opts.Name)
if err != nil {
return err
}
switch resource.Descriptor().Name {
case mesh.DataplaneType:
// Dataplane资源无法更新, 只能获取和删除
case mesh.TagRouteType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
id := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetRoutePath(id, consts.TagRoute)
cfg, err := t.governance.GetConfig(path)
if err != nil {
return err
}
if cfg == "" {
return core_store.ErrorResourceNotFound(resource.Descriptor().Name, opts.Name, opts.Mesh)
}
bytes, err := core_model.ToYAML(resource.GetSpec())
if err != nil {
return err
}
err = t.governance.SetConfig(path, string(bytes))
if err != nil {
return err
}
case mesh.ConditionRouteType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
id := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetRoutePath(id, consts.ConditionRoute)
cfg, err := t.governance.GetConfig(path)
if err != nil {
return err
}
if cfg == "" {
return core_store.ErrorResourceNotFound(resource.Descriptor().Name, opts.Name, opts.Mesh)
}
bytes, err := resource.GetSpec().(*mesh_proto.ConditionRoute).ToYAML()
if err != nil {
return err
}
err = t.governance.SetConfig(path, string(bytes))
if err != nil {
return err
}
case mesh.DynamicConfigType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
id := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetOverridePath(id)
existConfig, err := t.governance.GetConfig(path)
if err != nil {
return err
} else if existConfig == "" {
return core_store.ErrorResourceNotFound(resource.Descriptor().Name, opts.Name, opts.Mesh)
}
if b, err := core_model.ToYAML(resource.GetSpec()); err != nil {
return err
} else {
err := t.governance.SetConfig(path, string(b))
if err != nil {
return err
}
}
case mesh.AffinityRouteType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
id := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetRoutePath(id, consts.AffinityRoute)
existConfig, err := t.governance.GetConfig(path)
if err != nil {
return err
} else if existConfig == "" {
return core_store.ErrorResourceNotFound(resource.Descriptor().Name, opts.Name, opts.Mesh)
}
if b, err := core_model.ToYAML(resource.GetSpec()); err != nil {
return err
} else {
err := t.governance.SetConfig(path, string(b))
if err != nil {
return err
}
}
case mesh.MappingType:
spec := resource.GetSpec()
mapping := spec.(*mesh_proto.Mapping)
appNames := mapping.ApplicationNames
serviceInterface := mapping.InterfaceName
for _, app := range appNames {
path := getMappingPath(serviceInterface)
// 先使用regClient判断是否存在, 如果存在的话就先删除再更新
bytes, err := t.regClient.GetContent(path)
if err != nil {
return err
}
if len(bytes) != 0 {
// 说明有内容, 需要先删除
err := t.regClient.DeleteContent(path)
if err != nil {
return err
}
}
err = t.metadataReport.RegisterServiceAppMapping(serviceInterface, mappingGroup, app)
if err != nil {
return err
}
}
case mesh.MetaDataType:
spec := resource.GetSpec()
metadata := spec.(*mesh_proto.MetaData)
identifier := &dubbo_identifier.SubscriberMetadataIdentifier{
Revision: metadata.GetRevision(),
BaseApplicationMetadataIdentifier: dubbo_identifier.BaseApplicationMetadataIdentifier{
Application: metadata.GetApp(),
Group: dubboGroup,
},
}
// 先判断identifier是否存在, 如果存在到话需要将其删除
content, err := t.regClient.GetContent(getMetadataPath(metadata.GetApp(), metadata.GetRevision()))
if err != nil {
return err
}
if len(content) != 0 {
// 如果不为空, 先删除
err := t.regClient.DeleteContent(getMetadataPath(metadata.GetApp(), metadata.GetRevision()))
if err != nil {
return err
}
}
services := map[string]*common.ServiceInfo{}
// 把metadata赋值到services中
for key, serviceInfo := range metadata.GetServices() {
services[key] = &common.ServiceInfo{
Name: serviceInfo.GetName(),
Group: serviceInfo.GetGroup(),
Version: serviceInfo.GetVersion(),
Protocol: serviceInfo.GetProtocol(),
Path: serviceInfo.GetPath(),
Params: serviceInfo.GetParams(),
}
}
info := &common.MetadataInfo{
App: metadata.GetApp(),
Revision: metadata.GetRevision(),
Services: services,
}
err = t.metadataReport.PublishAppMetadata(identifier, info)
if err != nil {
return err
}
default:
bytes, err := core_model.ToYAML(resource.GetSpec())
if err != nil {
return err
}
path := GenerateCpGroupPath(string(resource.Descriptor().Name), name)
// 使用RegClient
err = t.regClient.SetContent(path, bytes)
if err != nil {
return err
}
}
resource.SetMeta(&resourceMetaObject{
Name: name,
Mesh: opts.Mesh,
ModificationTime: opts.ModificationTime,
Labels: maps.Clone(opts.Labels),
})
if t.eventWriter != nil {
go func() {
t.eventWriter.Send(events.ResourceChangedEvent{
Operation: events.Update,
Type: resource.Descriptor().Name,
Key: core_model.MetaToResourceKey(&resourceMetaObject{
Name: name,
Mesh: opts.Mesh,
}),
})
}()
}
return nil
}
func (t *traditionalStore) Delete(ctx context.Context, resource core_model.Resource, fs ...store.DeleteOptionsFunc) error {
opts := store.NewDeleteOptions(fs...)
if opts.Name == core_model.DefaultMesh {
opts.Name += ".universal"
}
name, _, err := util_k8s.CoreNameToK8sName(opts.Name)
if err != nil {
return err
}
switch resource.Descriptor().Name {
case mesh.DataplaneType:
// 不支持删除
case mesh.TagRouteType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
key := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetRoutePath(key, consts.TagRoute)
err := t.governance.DeleteConfig(path)
if err != nil {
return err
}
case mesh.ConditionRouteType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
key := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetRoutePath(key, consts.ConditionRoute)
err := t.governance.DeleteConfig(path)
if err != nil {
return err
}
case mesh.DynamicConfigType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
key := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetOverridePath(key)
_, err := t.governance.GetConfig(path)
if err != nil {
logger.Sugar().Error(err.Error())
return err
}
err = t.governance.DeleteConfig(path)
if err != nil {
return err
}
case mesh.AffinityRouteType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
key := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetRoutePath(key, consts.AffinityRoute)
_, err := t.governance.GetConfig(path)
if err != nil {
logger.Sugar().Error(err.Error())
return err
}
err = t.governance.DeleteConfig(path)
if err != nil {
return err
}
case mesh.MappingType:
// service.UpdateMapping()
// service.DeleteMapping()
case mesh.MetaDataType:
// service.DeleteMeta()
default:
path := GenerateCpGroupPath(string(resource.Descriptor().Name), name)
err = t.regClient.DeleteContent(path)
if err != nil {
return err
}
}
if t.eventWriter != nil {
go func() {
t.eventWriter.Send(events.ResourceChangedEvent{
Operation: events.Delete,
Type: resource.Descriptor().Name,
Key: core_model.ResourceKey{
Mesh: opts.Mesh,
Name: name,
},
})
}()
}
return nil
}
func (c *traditionalStore) Get(_ context.Context, resource core_model.Resource, fs ...store.GetOptionsFunc) error {
opts := store.NewGetOptions(fs...)
name := opts.Name
switch resource.Descriptor().Name {
case mesh.DataplaneType:
// 根据 address 匹配
instances := c.appContext.GetAllInstances()
for appName, i2 := range instances {
for _, ins := range i2 {
resourceMeta := &resourceMetaObject{
Name: ins.GetAddress(),
Mesh: opts.Mesh,
Labels: make(map[string]string),
}
resourceMeta.Labels[mesh_proto.Application] = appName
resourceMeta.Labels[mesh_proto.Revision] = ins.GetMetadata()[dubboconstant.ExportedServicesRevisionPropertyName]
resource.SetMeta(resourceMeta)
dataplaneResource := resource.(*mesh.DataplaneResource)
dataplaneResource.Spec.Networking = &mesh_proto.Dataplane_Networking{}
dataplaneResource.Spec.Extensions = map[string]string{}
dataplaneResource.Spec.Extensions[mesh_proto.Application] = appName
dataplaneResource.Spec.Extensions[mesh_proto.Revision] = ins.GetMetadata()[dubboconstant.ExportedServicesRevisionPropertyName]
dataplaneResource.Spec.Networking.Address = ins.GetAddress()
inbound := &mesh_proto.Dataplane_Networking_Inbound{
Port: uint32(ins.GetPort()),
Address: ins.GetAddress(),
Tags: ins.GetMetadata(),
}
dataplaneResource.Spec.Networking.Inbound = append(dataplaneResource.Spec.Networking.Inbound, inbound)
if opts.Predicate(resource) {
return nil
}
}
}
case mesh.TagRouteType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
id := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetRoutePath(id, consts.TagRoute)
cfg, err := c.governance.GetConfig(path)
if err != nil {
return err
}
if cfg != "" {
res := &mesh_proto.TagRoute{}
if err := core_model.FromYAML([]byte(cfg), res); err != nil {
return errors.Wrap(err, "failed to convert json to spec")
}
err = resource.SetSpec(res)
if err != nil {
panic(err)
}
} else {
return core_store.ErrorResourceNotFound(resource.Descriptor().Name, opts.Name, opts.Mesh)
}
resource.SetMeta(&resourceMetaObject{
Name: path,
Mesh: opts.Mesh,
})
case mesh.ConditionRouteType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
id := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetRoutePath(id, consts.ConditionRoute)
cfg, err := c.governance.GetConfig(path)
if err != nil {
return err
}
if cfg != "" {
res, err := mesh_proto.ConditionRouteDecodeFromYAML([]byte(cfg))
if err != nil {
return err
}
err = resource.SetSpec(res)
if err != nil {
panic(err)
}
} else {
return core_store.ErrorResourceNotFound(resource.Descriptor().Name, opts.Name, opts.Mesh)
}
resource.SetMeta(&resourceMetaObject{
Name: path,
Mesh: opts.Mesh,
})
case mesh.DynamicConfigType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
key := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetOverridePath(key)
cfg, err := c.governance.GetConfig(path)
if err != nil {
return err
}
if cfg != "" {
data := &mesh_proto.DynamicConfig{}
if err := core_model.FromYAML([]byte(cfg), data); err != nil {
return errors.Wrap(err, "failed to convert json to spec")
}
err = resource.SetSpec(data)
if err != nil {
panic(err)
}
} else {
return core_store.ErrorResourceNotFound(resource.Descriptor().Name, opts.Name, opts.Mesh)
}
resource.SetMeta(&resourceMetaObject{
Name: path,
Mesh: opts.Mesh,
})
case mesh.AffinityRouteType:
labels := opts.Labels
base := mesh_proto.Base{
Application: labels[mesh_proto.Application],
Service: labels[mesh_proto.Service],
ID: labels[mesh_proto.ID],
ServiceVersion: labels[mesh_proto.ServiceVersion],
ServiceGroup: labels[mesh_proto.ServiceGroup],
}
key := mesh_proto.BuildServiceKey(base)
path := mesh_proto.GetRoutePath(key, consts.AffinityRoute)
cfg, err := c.governance.GetConfig(path)
if err != nil {
return err
}
if cfg != "" {
data := &mesh_proto.AffinityRoute{}
if err := core_model.FromYAML([]byte(cfg), data); err != nil {
return errors.Wrap(err, "failed to convert json to spec")
}
err = resource.SetSpec(data)
if err != nil {
panic(err)
}
} else {
return core_store.ErrorResourceNotFound(resource.Descriptor().Name, opts.Name, opts.Mesh)
}
resource.SetMeta(&resourceMetaObject{
Name: path,
Mesh: opts.Mesh,
})
case mesh.MappingType:
// Get通过Key获取, 不设置listener
mappings := c.appContext.GetMapping()
meta := &resourceMetaObject{
Name: name,
Mesh: opts.Mesh,
}
resource.SetMeta(meta)
mapping := resource.GetSpec().(*mesh_proto.Mapping)
mapping.Zone = "default"
mapping.InterfaceName = name
var items []string
for k := range mappings[name].Items {
items = append(items, fmt.Sprintf("%v", k))
}
mapping.ApplicationNames = items
resource.SetMeta(&resourceMetaObject{
Name: name,
Mesh: opts.Mesh,
})
case mesh.MetaDataType:
// 拆分name得到revision和app
revision, err2 := extractRevision(name, opts, c)
if err2 != nil {
return err2
}
var appMetadata *common.MetadataInfo
if opts.Type == "interface" {
appMetadata = c.infContext.GetMetadata(revision)
} else {
appMetadata = c.appContext.GetRevisionToMetadata(revision)
}
if appMetadata == nil {
return nil
}
metaData := resource.GetSpec().(*mesh_proto.MetaData)
metaData.App = appMetadata.App
metaData.Revision = appMetadata.Revision
service := map[string]*mesh_proto.ServiceInfo{}
for key, serviceInfo := range appMetadata.Services {
var params map[string]string
if serviceInfo.URL != nil {
params = serviceInfo.URL.ToMap()
} else {
params = serviceInfo.Params
}
service[key] = &mesh_proto.ServiceInfo{
Name: serviceInfo.Name,
Group: serviceInfo.Group,
Version: serviceInfo.Version,
Protocol: serviceInfo.Protocol,
Path: serviceInfo.Path,
Params: params,
}
}
metaData.Services = service
resource.SetMeta(&resourceMetaObject{
Name: name,
Mesh: opts.Mesh,
})
default:
path := GenerateCpGroupPath(string(resource.Descriptor().Name), name)
value, err := c.regClient.GetContent(path)
if err != nil {
return err
}
if err := core_model.FromYAML(value, resource.GetSpec()); err != nil {
return err
}
resource.SetMeta(&resourceMetaObject{
Name: name,
Mesh: opts.Mesh,
})
}
return nil
}
func extractRevision(name string, opts *store.GetOptions, c *traditionalStore) (string, error) {
if opts.Labels[mesh_proto.Revision] != "" {
return opts.Labels[mesh_proto.Revision], nil
} else {
app, revision := splitAppAndRevision(name)
if revision == "" {
children, err := c.regClient.GetChildren(getMetadataPath(app))
if err != nil {
return "", err
}
revision = children[0]
}
return revision, nil
}
}
func (c *traditionalStore) List(_ context.Context, resources core_model.ResourceList, fs ...store.ListOptionsFunc) error {
opts := store.NewListOptions(fs...)
switch resources.GetItemType() {
case mesh.DataplaneType:
appInstances := c.appContext.GetAllInstances()
infInstances := c.infContext.GetAllInstances()
allInstances := registry.MergeInstances(appInstances, infInstances)
// iterator services key set
for _, instances := range allInstances {
for _, ins := range instances {
key := ins.GetServiceName()
item := resources.NewItem()
dataplaneResource := item.(*mesh.DataplaneResource)
resourceMeta := &resourceMetaObject{
Name: ins.GetAddress(),
Mesh: core_model.DefaultMesh,
Labels: make(map[string]string),
}
resourceMeta.Labels[mesh_proto.Application] = key
resourceMeta.Labels[mesh_proto.Revision] = ins.GetMetadata()[dubboconstant.ExportedServicesRevisionPropertyName]
dataplaneResource.SetMeta(resourceMeta)
dataplaneResource.Spec.Networking = &mesh_proto.Dataplane_Networking{}
dataplaneResource.Spec.Extensions = map[string]string{}
dataplaneResource.Spec.Extensions[mesh_proto.Application] = key
dataplaneResource.Spec.Extensions[mesh_proto.Revision] = ins.GetMetadata()[dubboconstant.ExportedServicesRevisionPropertyName]
for k, v := range ins.GetMetadata() {
dataplaneResource.Spec.Extensions[k] = v
}
inbound := &mesh_proto.Dataplane_Networking_Inbound{
Port: uint32(ins.GetPort()),
Address: ins.GetAddress(),
Tags: ins.GetMetadata(),
}
dataplaneResource.Spec.Networking.Inbound = append(dataplaneResource.Spec.Networking.Inbound, inbound)
dataplaneResource.Spec.Networking.Address = ins.GetAddress()
if opts.Predicate(item) {
err := resources.AddItem(item)
if err != nil {
return err
}
}
}
}
case mesh.MappingType:
appMappings := c.appContext.GetMapping()
infMappings := c.infContext.GetMapping()
mappings := registry.MergeMapping(appMappings, infMappings)
for key, set := range mappings {
meta := &resourceMetaObject{
Name: key,
}
item := resources.NewItem()
item.SetMeta(meta)
mapping := item.GetSpec().(*mesh_proto.Mapping)
mapping.Zone = "default"
mapping.InterfaceName = key
var items []string
for k := range set.Items {
items = append(items, fmt.Sprintf("%v", k))
}
mapping.ApplicationNames = items
if opts.Predicate(item) {
err := resources.AddItem(item)
if err != nil {
return err
}
}
}
case mesh.MetaDataType:
// app metadata
// inf metadata
// 1. 获取到所有的key, key是application(应用名)
for app, instances := range c.appContext.GetAllInstances() {
// 2. 获取到该应用名下所有的revision
revisions := make(map[string]struct{})
for _, instance := range instances {
revisions[instance.GetMetadata()[dubboconstant.ExportedServicesRevisionPropertyName]] = struct{}{}
}
for revision := range revisions {
appMetadata := c.appContext.GetRevisionToMetadata(revision)
if appMetadata == nil {
log.Error(nil, "Err loading app metadata with id %s", dubbo_identifier.NewSubscriberMetadataIdentifier(app, revision))
continue
}
item := resources.NewItem()
metaData := item.GetSpec().(*mesh_proto.MetaData)
metaData.App = appMetadata.App
metaData.Revision = appMetadata.Revision
service := map[string]*mesh_proto.ServiceInfo{}
for key, serviceInfo := range appMetadata.Services {
service[key] = &mesh_proto.ServiceInfo{
Name: serviceInfo.Name,
Group: serviceInfo.Group,
Version: serviceInfo.Version,
Protocol: serviceInfo.Protocol,
Path: serviceInfo.Path,
Params: serviceInfo.Params,
}
}
metaData.Services = service
resourceMeta := &resourceMetaObject{
Name: app,
Version: revision,
}
resourceMeta.Labels[mesh_proto.Application] = app
resourceMeta.Labels[mesh_proto.Revision] = revision
item.SetMeta(resourceMeta)
if opts.Predicate(item) {
err := resources.AddItem(item)
if err != nil {
return err
}
}
}
}
case mesh.DynamicConfigType:
cfg, err := c.governance.GetList(consts.ConfiguratorRuleSuffix)
if err != nil {
return err
}
for name, rule := range cfg {
newIt := resources.NewItem()
ConfiguratorCfg, err := parseConfiguratorConfig(rule)
_ = newIt.SetSpec(ConfiguratorCfg)
meta := &resourceMetaObject{
Name: name,
Mesh: opts.Mesh,
Labels: maps.Clone(opts.Labels),
}
if err != nil {
logger.Errorf("failed to parse dynamicConfig rule: %s : %s, %s", name, rule, err.Error())
continue
}
newIt.SetMeta(meta)
_ = resources.AddItem(newIt)
}
case mesh.TagRouteType:
cfg, err := c.governance.GetList(consts.TagRuleSuffix)
if err != nil {
return err
}
for name, rule := range cfg {
newIt := resources.NewItem()
ConfiguratorCfg, err := parseTagConfig(rule)
_ = newIt.SetSpec(ConfiguratorCfg)
meta := &resourceMetaObject{
Name: name,
Mesh: opts.Mesh,
Labels: maps.Clone(opts.Labels),
}
if err != nil {
logger.Errorf("failed to parse tag rule: %s : %s, %s", name, rule, err.Error())
continue
}
newIt.SetMeta(meta)
_ = resources.AddItem(newIt)
}
case mesh.ConditionRouteType:
cfg, err := c.governance.GetList(consts.ConditionRuleSuffix)
if err != nil {
return err
}
for name, rule := range cfg {
newIt := resources.NewItem()
ConfiguratorCfg, err := parseConditionConfig(rule)
if err != nil {
logger.Errorf("failed to parse condition rule: %s : %s, %s", name, rule, err.Error())
continue
} else {
_ = newIt.SetSpec(ConfiguratorCfg)
meta := &resourceMetaObject{
Name: name,
Mesh: opts.Mesh,
Labels: maps.Clone(opts.Labels),
}
newIt.SetMeta(meta)
_ = resources.AddItem(newIt)
}
}
case mesh.AffinityRouteType:
cfg, err := c.governance.GetList(consts.AffinityRuleSuffix)
if err != nil {
return err
}
for name, rule := range cfg {
newIt := resources.NewItem()
ConfiguratorCfg, err := parseAffinityConfig(rule)
if err != nil {
logger.Errorf("failed to parse condition rule: %s : %s, %s", name, rule, err.Error())
continue
} else {
_ = newIt.SetSpec(ConfiguratorCfg)
meta := &resourceMetaObject{
Name: name,
Mesh: opts.Mesh,
Labels: maps.Clone(opts.Labels),
}
newIt.SetMeta(meta)
_ = resources.AddItem(newIt)
}
}
default:
rootDir := getDubboCpPath(string(resources.GetItemType()))
names, err := c.regClient.GetChildren(rootDir)
if err != nil {
return err
}
for _, name := range names {
path := getDubboCpPath(string(resources.GetItemType()), name)
bytes, err := c.regClient.GetContent(path)
if err != nil {
return err
}
item := resources.NewItem()
if err = core_model.FromYAML(bytes, item.GetSpec()); err != nil {
return err
}
item.SetMeta(&resourceMetaObject{
Name: name,
Labels: maps.Clone(opts.Labels),
})
err = resources.AddItem(item)
if err != nil {
return err
}
}
}
return nil
}
// copyResource, todo is copy necessary since they are of the same type?
func (c *traditionalStore) copyResource(key any, dst core_model.Resource, src core_model.Resource, opts *store.ListOptions) (bool, error) {
if opts.NameEquals != key.(string) {
return false, nil
} else {
name := opts.NameContains
if name != "" && !strings.Contains(key.(string), name) {
return false, nil
}
}
dst.SetMeta(src.GetMeta())
err := dst.SetSpec(src.GetSpec())
return true, err
}