blob: 733dc2d49a8e2b3a66d3398b3f818f2c6dd2f0b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package context
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"hash/fnv"
"slices"
)
import (
"github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-kubernetes/pkg/core/dns/lookup"
core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/system"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/registry"
core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
xds_topology "github.com/apache/dubbo-kubernetes/pkg/xds/topology"
)
type meshContextBuilder struct {
rm manager.ReadOnlyResourceManager
typeSet map[core_model.ResourceType]struct{}
ipFunc lookup.LookupIPFunc
zone string
}
type MeshContextBuilder interface {
// BuildGlobalContextIfChanged builds GlobalContext only if `latest` is nil or hash is different
// If hash is the same, the return `latest`
BuildGlobalContextIfChanged(ctx context.Context, latest *GlobalContext) (*GlobalContext, error)
// BuildIfChanged builds MeshContext only if latestMeshCtx is nil or hash of
// latestMeshCtx is different.
// If hash is the same, then the function returns the passed latestMeshCtx.
// Hash returned in MeshContext can never be empty.
BuildIfChanged(ctx context.Context, meshName string, latestMeshCtx *MeshContext) (*MeshContext, error)
}
func NewMeshContextBuilder(
rm manager.ReadOnlyResourceManager,
types []core_model.ResourceType, // types that should be taken into account when MeshContext is built.
ipFunc lookup.LookupIPFunc,
zone string,
) MeshContextBuilder {
typeSet := map[core_model.ResourceType]struct{}{}
for _, typ := range types {
typeSet[typ] = struct{}{}
}
return &meshContextBuilder{
rm: rm,
typeSet: typeSet,
ipFunc: ipFunc,
zone: zone,
}
}
func (m *meshContextBuilder) BuildGlobalContextIfChanged(ctx context.Context, latest *GlobalContext) (*GlobalContext, error) {
rmap := ResourceMap{}
for t := range m.typeSet {
desc, err := registry.Global().DescriptorFor(t)
if err != nil {
return nil, err
}
if desc.Scope == core_model.ScopeGlobal && desc.Name != system.ConfigType { // For config we ignore them atm and prefer to rely on more specific filters.
rmap[t], err = m.fetchResourceList(ctx, t, nil, nil)
if err != nil {
return nil, errors.Wrap(err, "failed to build global context")
}
}
}
newHash := rmap.Hash()
if latest != nil && bytes.Equal(newHash, latest.hash) {
return latest, nil
}
return &GlobalContext{
hash: newHash,
ResourceMap: rmap,
}, nil
}
func (m *meshContextBuilder) BuildBaseMeshContextIfChanged(ctx context.Context, meshName string, latest *BaseMeshContext) (*BaseMeshContext, error) {
mesh := core_mesh.NewMeshResource()
if err := m.rm.Get(ctx, mesh, core_store.GetByKey(meshName, core_model.NoMesh)); err != nil {
return nil, err
}
rmap := ResourceMap{}
// Add the mesh to the resourceMap
rmap[core_mesh.MeshType] = mesh.Descriptor().NewList()
_ = rmap[core_mesh.MeshType].AddItem(mesh)
rmap[core_mesh.MeshType].GetPagination().SetTotal(1)
for t := range m.typeSet {
desc, err := registry.Global().DescriptorFor(t)
if err != nil {
return nil, err
}
// Only pick the policies, gateways, external services and the vip config map
switch {
case desc.IsPolicy:
rmap[t], err = m.fetchResourceList(ctx, t, mesh, nil)
// ignore system.ConfigType for now
default:
// DO nothing we're not interested in this type
}
if err != nil {
return nil, errors.Wrap(err, "failed to build base mesh context")
}
}
newHash := rmap.Hash()
if latest != nil && bytes.Equal(newHash, latest.hash) {
return latest, nil
}
return &BaseMeshContext{
hash: newHash,
Mesh: mesh,
ResourceMap: rmap,
}, nil
}
func (m meshContextBuilder) BuildIfChanged(ctx context.Context, meshName string, latestMeshCtx *MeshContext) (*MeshContext, error) {
globalContext, err := m.BuildGlobalContextIfChanged(ctx, nil)
if err != nil {
return nil, err
}
baseMeshContext, err := m.BuildBaseMeshContextIfChanged(ctx, meshName, nil)
if err != nil {
return nil, err
}
var managedTypes []core_model.ResourceType // The types not managed by global
resources := NewResources()
for resType := range m.typeSet {
rl, ok := globalContext.ResourceMap[resType]
if ok {
// Exists in global context take it from there
resources.MeshLocalResources[resType] = rl
} else {
rl, ok = baseMeshContext.ResourceMap[resType]
if ok { // Exist in the baseMeshContext take it from there
resources.MeshLocalResources[resType] = rl
} else { // absent from all parent contexts get it now
managedTypes = append(managedTypes, resType)
rl, err = m.fetchResourceList(ctx, resType, baseMeshContext.Mesh, nil)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("could not fetch resources of type:%s", resType))
}
resources.MeshLocalResources[resType] = rl
}
}
}
newHash := base64.StdEncoding.EncodeToString(m.hash(globalContext, managedTypes, resources))
if latestMeshCtx != nil && newHash == latestMeshCtx.Hash {
return latestMeshCtx, nil
}
dataplanes := resources.Dataplanes().Items
dataplanesByName := make(map[string]*core_mesh.DataplaneResource, len(dataplanes))
for _, dp := range dataplanes {
dataplanesByName[dp.Meta.GetName()] = dp
}
mesh := baseMeshContext.Mesh
zoneIngresses := resources.ZoneIngresses().Items
endpointMap := xds_topology.BuildEdsEndpoint(m.zone, dataplanes, zoneIngresses)
return &MeshContext{
Hash: newHash,
Resource: mesh,
Resources: resources,
DataplanesByName: dataplanesByName,
EndpointMap: endpointMap,
}, nil
}
type filterFn = func(rs core_model.Resource) bool
func (m *meshContextBuilder) fetchResourceList(ctx context.Context, resType core_model.ResourceType, mesh *core_mesh.MeshResource, filterFn filterFn) (core_model.ResourceList, error) {
var listOptsFunc []core_store.ListOptionsFunc
desc, err := registry.Global().DescriptorFor(resType)
if err != nil {
return nil, err
}
switch desc.Scope {
case core_model.ScopeGlobal:
case core_model.ScopeMesh:
if mesh != nil {
listOptsFunc = append(listOptsFunc, core_store.ListByMesh(mesh.GetMeta().GetName()))
}
default:
return nil, fmt.Errorf("unknown resource scope:%s", desc.Scope)
}
listOptsFunc = append(listOptsFunc, core_store.ListOrdered())
list := desc.NewList()
// TODO: Currently, We only interested in Dataplane, Mapping, Mesh and MetaData
acceptedTypes := map[core_model.ResourceType]struct{}{
core_mesh.DataplaneType: {},
core_mesh.MappingType: {},
core_mesh.MeshType: {},
core_mesh.MetaDataType: {},
}
if _, ok := acceptedTypes[resType]; !ok {
// ignore non-dataplane resources
return list, nil
}
if err := m.rm.List(ctx, list, listOptsFunc...); err != nil {
return nil, err
}
if resType != core_mesh.ZoneIngressType && resType != core_mesh.DataplaneType && filterFn == nil {
// No post processing stuff so return the list as is
return list, nil
}
list, err = modifyAllEntries(list, func(resource core_model.Resource) (core_model.Resource, error) {
if filterFn != nil && !filterFn(resource) {
return nil, nil
}
switch resType {
case core_mesh.DataplaneType:
list, err = modifyAllEntries(list, func(resource core_model.Resource) (core_model.Resource, error) {
dp, ok := resource.(*core_mesh.DataplaneResource)
if !ok {
return nil, errors.New("entry is not a dataplane this shouldn't happen")
}
zi, err := xds_topology.ResolveDataplaneAddress(m.ipFunc, dp)
if err != nil {
return nil, nil
}
return zi, nil
})
}
return resource, nil
})
if err != nil {
return nil, err
}
return list, nil
}
// takes a resourceList and modify it as needed
func modifyAllEntries(list core_model.ResourceList, fn func(resource core_model.Resource) (core_model.Resource, error)) (core_model.ResourceList, error) {
newList := list.NewItem().Descriptor().NewList()
for _, v := range list.GetItems() {
ni, err := fn(v)
if err != nil {
return nil, err
}
if ni != nil {
err := newList.AddItem(ni)
if err != nil {
return nil, err
}
}
}
// it is meaningless temporarily
newList.GetPagination().SetTotal(uint32(len(newList.GetItems())))
return newList, nil
}
func (m *meshContextBuilder) hash(globalContext *GlobalContext, managedTypes []core_model.ResourceType, resources Resources) []byte {
slices.Sort(managedTypes)
hasher := fnv.New128a()
_, _ = hasher.Write(globalContext.hash)
for _, resType := range managedTypes {
_, _ = hasher.Write(core_model.ResourceListHash(resources.MeshLocalResources[resType]))
}
return hasher.Sum(nil)
}