Merge branch 'main' into xds-server
diff --git a/pkg/core/runtime/builder.go b/pkg/core/runtime/builder.go
index 55122a4..d86617c 100644
--- a/pkg/core/runtime/builder.go
+++ b/pkg/core/runtime/builder.go
@@ -208,6 +208,10 @@
return b
}
+func (b *Builder) MeshCache() *mesh.Cache {
+ return b.meshCache
+}
+
func (b *Builder) WithDDSContext(ddsctx *dds_context.Context) *Builder {
b.ddsctx = ddsctx
return b
@@ -281,6 +285,9 @@
if b.dps == nil {
return nil, errors.Errorf("DpServer has not been configured")
}
+ if b.meshCache == nil {
+ return nil, errors.Errorf("MeshCache has not been configured")
+ }
return &runtime{
RuntimeInfo: b.runtimeInfo,
@@ -304,6 +311,7 @@
serviceDiscovery: b.serviceDiscover,
rv: b.rv,
appCtx: b.appCtx,
+ meshCache: b.meshCache,
regClient: b.regClient,
},
Manager: b.cm,
diff --git a/pkg/core/runtime/runtime.go b/pkg/core/runtime/runtime.go
index e2e6496..4d380e1 100644
--- a/pkg/core/runtime/runtime.go
+++ b/pkg/core/runtime/runtime.go
@@ -44,6 +44,7 @@
dds_context "github.com/apache/dubbo-kubernetes/pkg/dds/context"
dp_server "github.com/apache/dubbo-kubernetes/pkg/dp-server/server"
"github.com/apache/dubbo-kubernetes/pkg/events"
+ "github.com/apache/dubbo-kubernetes/pkg/xds/cache/mesh"
xds_runtime "github.com/apache/dubbo-kubernetes/pkg/xds/runtime"
)
@@ -87,6 +88,7 @@
// AppContext returns a context.Context which tracks the lifetime of the apps, it gets cancelled when the app is starting to shutdown.
AppContext() context.Context
XDS() xds_runtime.XDSRuntimeContext
+ MeshCache() *mesh.Cache
}
type ResourceValidators struct {
@@ -167,6 +169,7 @@
adminRegistry *registry.Registry
governance governance.GovernanceConfig
appCtx context.Context
+ meshCache *mesh.Cache
regClient reg_client.RegClient
serviceDiscovery dubboRegistry.ServiceDiscovery
}
@@ -203,6 +206,10 @@
return b.metadataReportCenter
}
+func (b *runtimeContext) MeshCache() *mesh.Cache {
+ return b.meshCache
+}
+
func (rc *runtimeContext) DDSContext() *dds_context.Context {
return rc.ddsctx
}
diff --git a/pkg/core/xds/types.go b/pkg/core/xds/types.go
index 3380391..4035d13 100644
--- a/pkg/core/xds/types.go
+++ b/pkg/core/xds/types.go
@@ -163,6 +163,7 @@
type ExternalServiceDynamicPolicies map[ServiceName]PluginOriginatedPolicies
type MeshIngressResources struct {
+ Mesh *core_mesh.MeshResource
EndpointMap EndpointMap
Resources map[core_model.ResourceType]core_model.ResourceList
}
diff --git a/pkg/xds/context/aggregate_mesh_context.go b/pkg/xds/context/aggregate_mesh_context.go
new file mode 100644
index 0000000..fb2e5cb
--- /dev/null
+++ b/pkg/xds/context/aggregate_mesh_context.go
@@ -0,0 +1,57 @@
+package context
+
+import (
+ "context"
+)
+
+import (
+ core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+ "github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
+ core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
+ "github.com/apache/dubbo-kubernetes/pkg/xds/cache/sha256"
+)
+
+type meshContextFetcher = func(ctx context.Context, meshName string) (MeshContext, error)
+
+func AggregateMeshContexts(
+ ctx context.Context,
+ resManager manager.ReadOnlyResourceManager,
+ fetcher meshContextFetcher,
+) (AggregatedMeshContexts, error) {
+ var meshList core_mesh.MeshResourceList
+ if err := resManager.List(ctx, &meshList, core_store.ListOrdered()); err != nil {
+ return AggregatedMeshContexts{}, err
+ }
+
+ var meshContexts []MeshContext
+ meshContextsByName := map[string]MeshContext{}
+ for _, mesh := range meshList.Items {
+ meshCtx, err := fetcher(ctx, mesh.GetMeta().GetName())
+ if err != nil {
+ if core_store.IsResourceNotFound(err) {
+ // When the mesh no longer exists it's likely because it was removed since, let's just skip it.
+ continue
+ }
+ return AggregatedMeshContexts{}, err
+ }
+ meshContexts = append(meshContexts, meshCtx)
+ meshContextsByName[mesh.Meta.GetName()] = meshCtx
+ }
+
+ hash := aggregatedHash(meshContexts)
+
+ result := AggregatedMeshContexts{
+ Hash: hash,
+ Meshes: meshList.Items,
+ MeshContextsByName: meshContextsByName,
+ }
+ return result, nil
+}
+
+func aggregatedHash(meshContexts []MeshContext) string {
+ var hash string
+ for _, meshCtx := range meshContexts {
+ hash += meshCtx.Hash
+ }
+ return sha256.Hash(hash)
+}
diff --git a/pkg/xds/context/context.go b/pkg/xds/context/context.go
index ea3a57c..8537db0 100644
--- a/pkg/xds/context/context.go
+++ b/pkg/xds/context/context.go
@@ -55,6 +55,18 @@
return base64.StdEncoding.EncodeToString(g.hash)
}
+// BaseMeshContext holds for a Mesh a set of resources that are changing less often (policies, external services...)
+type BaseMeshContext struct {
+ Mesh *core_mesh.MeshResource
+ ResourceMap ResourceMap
+ hash []byte
+}
+
+// Hash base64 version of the hash mostly useed for testing
+func (g BaseMeshContext) Hash() string {
+ return base64.StdEncoding.EncodeToString(g.hash)
+}
+
type MeshContext struct {
Hash string
Resource *core_mesh.MeshResource
@@ -76,3 +88,36 @@
}
return core_mesh.ProtocolUnknown
}
+
+// AggregatedMeshContexts is an aggregate of all MeshContext across all meshes
+type AggregatedMeshContexts struct {
+ Hash string
+ Meshes []*core_mesh.MeshResource
+ MeshContextsByName map[string]MeshContext
+}
+
+// MustGetMeshContext panics if there is no mesh context for given mesh. Call it when iterating over .Meshes
+// There is a guarantee that for every Mesh in .Meshes there is a MeshContext.
+func (m AggregatedMeshContexts) MustGetMeshContext(meshName string) MeshContext {
+ meshCtx, ok := m.MeshContextsByName[meshName]
+ if !ok {
+ panic("there should be a corresponding mesh context for every mesh in mesh contexts")
+ }
+ return meshCtx
+}
+
+func (m AggregatedMeshContexts) AllDataplanes() []*core_mesh.DataplaneResource {
+ var resources []*core_mesh.DataplaneResource
+ for _, mesh := range m.Meshes {
+ meshCtx := m.MustGetMeshContext(mesh.Meta.GetName())
+ resources = append(resources, meshCtx.Resources.Dataplanes().Items...)
+ }
+ return resources
+}
+
+func (m AggregatedMeshContexts) ZoneIngresses() []*core_mesh.ZoneIngressResource {
+ for _, meshCtx := range m.MeshContextsByName {
+ return meshCtx.Resources.ZoneIngresses().Items // all mesh contexts has the same list
+ }
+ return nil
+}
diff --git a/pkg/xds/context/mesh_context_builder.go b/pkg/xds/context/mesh_context_builder.go
index e17c737..733dc2d 100644
--- a/pkg/xds/context/mesh_context_builder.go
+++ b/pkg/xds/context/mesh_context_builder.go
@@ -21,7 +21,9 @@
"bytes"
"context"
"encoding/base64"
+ "fmt"
"hash/fnv"
+ "slices"
)
import (
@@ -85,7 +87,7 @@
return nil, err
}
if desc.Scope == core_model.ScopeGlobal && desc.Name != system.ConfigType { // For config we ignore them atm and prefer to rely on more specific filters.
- rmap[t], err = m.fetchResourceList(ctx, t, nil)
+ rmap[t], err = m.fetchResourceList(ctx, t, nil, nil)
if err != nil {
return nil, errors.Wrap(err, "failed to build global context")
}
@@ -101,22 +103,77 @@
}, nil
}
+func (m *meshContextBuilder) BuildBaseMeshContextIfChanged(ctx context.Context, meshName string, latest *BaseMeshContext) (*BaseMeshContext, error) {
+ mesh := core_mesh.NewMeshResource()
+ if err := m.rm.Get(ctx, mesh, core_store.GetByKey(meshName, core_model.NoMesh)); err != nil {
+ return nil, err
+ }
+ rmap := ResourceMap{}
+ // Add the mesh to the resourceMap
+ rmap[core_mesh.MeshType] = mesh.Descriptor().NewList()
+ _ = rmap[core_mesh.MeshType].AddItem(mesh)
+ rmap[core_mesh.MeshType].GetPagination().SetTotal(1)
+ for t := range m.typeSet {
+ desc, err := registry.Global().DescriptorFor(t)
+ if err != nil {
+ return nil, err
+ }
+ // Only pick the policies, gateways, external services and the vip config map
+ switch {
+ case desc.IsPolicy:
+ rmap[t], err = m.fetchResourceList(ctx, t, mesh, nil)
+ // ignore system.ConfigType for now
+ default:
+ // DO nothing we're not interested in this type
+ }
+ if err != nil {
+ return nil, errors.Wrap(err, "failed to build base mesh context")
+ }
+ }
+ newHash := rmap.Hash()
+ if latest != nil && bytes.Equal(newHash, latest.hash) {
+ return latest, nil
+ }
+ return &BaseMeshContext{
+ hash: newHash,
+ Mesh: mesh,
+ ResourceMap: rmap,
+ }, nil
+}
+
func (m meshContextBuilder) BuildIfChanged(ctx context.Context, meshName string, latestMeshCtx *MeshContext) (*MeshContext, error) {
globalContext, err := m.BuildGlobalContextIfChanged(ctx, nil)
if err != nil {
return nil, err
}
+ baseMeshContext, err := m.BuildBaseMeshContextIfChanged(ctx, meshName, nil)
+ if err != nil {
+ return nil, err
+ }
+ var managedTypes []core_model.ResourceType // The types not managed by global
resources := NewResources()
for resType := range m.typeSet {
rl, ok := globalContext.ResourceMap[resType]
if ok {
// Exists in global context take it from there
resources.MeshLocalResources[resType] = rl
+ } else {
+ rl, ok = baseMeshContext.ResourceMap[resType]
+ if ok { // Exist in the baseMeshContext take it from there
+ resources.MeshLocalResources[resType] = rl
+ } else { // absent from all parent contexts get it now
+ managedTypes = append(managedTypes, resType)
+ rl, err = m.fetchResourceList(ctx, resType, baseMeshContext.Mesh, nil)
+ if err != nil {
+ return nil, errors.Wrap(err, fmt.Sprintf("could not fetch resources of type:%s", resType))
+ }
+ resources.MeshLocalResources[resType] = rl
+ }
}
}
- newHash := base64.StdEncoding.EncodeToString(m.hash(globalContext))
+ newHash := base64.StdEncoding.EncodeToString(m.hash(globalContext, managedTypes, resources))
if latestMeshCtx != nil && newHash == latestMeshCtx.Hash {
return latestMeshCtx, nil
}
@@ -127,10 +184,13 @@
dataplanesByName[dp.Meta.GetName()] = dp
}
- endpointMap := xds_topology.BuildEdsEndpoint(m.zone, dataplanes, nil)
+ mesh := baseMeshContext.Mesh
+ zoneIngresses := resources.ZoneIngresses().Items
+ endpointMap := xds_topology.BuildEdsEndpoint(m.zone, dataplanes, zoneIngresses)
return &MeshContext{
Hash: newHash,
+ Resource: mesh,
Resources: resources,
DataplanesByName: dataplanesByName,
EndpointMap: endpointMap,
@@ -139,14 +199,34 @@
type filterFn = func(rs core_model.Resource) bool
-func (m *meshContextBuilder) fetchResourceList(ctx context.Context, resType core_model.ResourceType, filterFn filterFn) (core_model.ResourceList, error) {
+func (m *meshContextBuilder) fetchResourceList(ctx context.Context, resType core_model.ResourceType, mesh *core_mesh.MeshResource, filterFn filterFn) (core_model.ResourceList, error) {
var listOptsFunc []core_store.ListOptionsFunc
desc, err := registry.Global().DescriptorFor(resType)
if err != nil {
return nil, err
}
+ switch desc.Scope {
+ case core_model.ScopeGlobal:
+ case core_model.ScopeMesh:
+ if mesh != nil {
+ listOptsFunc = append(listOptsFunc, core_store.ListByMesh(mesh.GetMeta().GetName()))
+ }
+ default:
+ return nil, fmt.Errorf("unknown resource scope:%s", desc.Scope)
+ }
listOptsFunc = append(listOptsFunc, core_store.ListOrdered())
list := desc.NewList()
+ // TODO: Currently, We only interested in Dataplane, Mapping, Mesh and MetaData
+ acceptedTypes := map[core_model.ResourceType]struct{}{
+ core_mesh.DataplaneType: {},
+ core_mesh.MappingType: {},
+ core_mesh.MeshType: {},
+ core_mesh.MetaDataType: {},
+ }
+ if _, ok := acceptedTypes[resType]; !ok {
+ // ignore non-dataplane resources
+ return list, nil
+ }
if err := m.rm.List(ctx, list, listOptsFunc...); err != nil {
return nil, err
}
@@ -154,6 +234,7 @@
// No post processing stuff so return the list as is
return list, nil
}
+
list, err = modifyAllEntries(list, func(resource core_model.Resource) (core_model.Resource, error) {
if filterFn != nil && !filterFn(resource) {
return nil, nil
@@ -200,8 +281,13 @@
return newList, nil
}
-func (m *meshContextBuilder) hash(globalContext *GlobalContext) []byte {
+func (m *meshContextBuilder) hash(globalContext *GlobalContext, managedTypes []core_model.ResourceType, resources Resources) []byte {
+ slices.Sort(managedTypes)
hasher := fnv.New128a()
_, _ = hasher.Write(globalContext.hash)
+ for _, resType := range managedTypes {
+ _, _ = hasher.Write(core_model.ResourceListHash(resources.MeshLocalResources[resType]))
+ }
+
return hasher.Sum(nil)
}
diff --git a/pkg/xds/envoy/endpoints/endpoints.go b/pkg/xds/envoy/endpoints/endpoints.go
index 89afc9a..7e12baa 100644
--- a/pkg/xds/envoy/endpoints/endpoints.go
+++ b/pkg/xds/envoy/endpoints/endpoints.go
@@ -22,7 +22,7 @@
)
import (
- "github.com/golang/protobuf/proto"
+ "google.golang.org/protobuf/proto"
)
import (
diff --git a/pkg/xds/generator/ingress_proxy_generator.go b/pkg/xds/generator/ingress_proxy_generator.go
index 943e3c2..1ae70db 100644
--- a/pkg/xds/generator/ingress_proxy_generator.go
+++ b/pkg/xds/generator/ingress_proxy_generator.go
@@ -19,21 +19,80 @@
import (
"context"
+ "sort"
)
import (
+ envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
+
+ "golang.org/x/exp/maps"
+)
+
+import (
+ mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/core"
+ core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
model "github.com/apache/dubbo-kubernetes/pkg/core/xds"
xds_context "github.com/apache/dubbo-kubernetes/pkg/xds/context"
+ envoy_listeners "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/listeners"
+ "github.com/apache/dubbo-kubernetes/pkg/xds/generator/zoneproxy"
)
var ingressLog = core.Log.WithName("ingress-proxy-generator")
// Ingress is a marker to indicate by which ProxyGenerator resources were generated.
-const Ingress = "outbound"
+const Ingress = "ingress"
type IngressGenerator struct{}
func (g IngressGenerator) Generator(ctx context.Context, _ *model.ResourceSet, xdsCtx xds_context.Context, proxy *model.Proxy) (*model.ResourceSet, error) {
- return nil, nil
+ resources := core_xds.NewResourceSet()
+
+ networking := proxy.ZoneIngressProxy.ZoneIngressResource.Spec.GetNetworking()
+ address, port := networking.GetAddress(), networking.GetPort()
+ listenerBuilder := envoy_listeners.NewInboundListenerBuilder(proxy.APIVersion, address, port, core_xds.SocketAddressProtocolTCP).
+ Configure(envoy_listeners.TLSInspector())
+
+ availableSvcsByMesh := map[string][]*mesh_proto.ZoneIngress_AvailableService{}
+ for _, service := range proxy.ZoneIngressProxy.ZoneIngressResource.Spec.AvailableServices {
+ availableSvcsByMesh[service.Mesh] = append(availableSvcsByMesh[service.Mesh], service)
+ }
+
+ for _, mr := range proxy.ZoneIngressProxy.MeshResourceList {
+ meshName := mr.Mesh.GetMeta().GetName()
+ serviceList := maps.Keys(mr.EndpointMap)
+ sort.Strings(serviceList)
+ dest := zoneproxy.BuildMeshDestinations(
+ availableSvcsByMesh[meshName],
+ xds_context.Resources{MeshLocalResources: mr.Resources},
+ )
+
+ services := zoneproxy.AddFilterChains(availableSvcsByMesh[meshName], proxy.APIVersion, listenerBuilder, dest, mr.EndpointMap)
+
+ cdsResources, err := zoneproxy.GenerateCDS(dest, services, proxy.APIVersion, meshName, Ingress)
+ if err != nil {
+ return nil, err
+ }
+ resources.Add(cdsResources...)
+
+ edsResources, err := zoneproxy.GenerateEDS(services, mr.EndpointMap, proxy.APIVersion, meshName, Ingress)
+ if err != nil {
+ return nil, err
+ }
+ resources.Add(edsResources...)
+ }
+
+ listener, err := listenerBuilder.Build()
+ if err != nil {
+ return nil, err
+ }
+ if len(listener.(*envoy_listener_v3.Listener).FilterChains) > 0 {
+ resources.Add(&core_xds.Resource{
+ Name: listener.GetName(),
+ Origin: Ingress,
+ Resource: listener,
+ })
+ }
+
+ return resources, nil
}
diff --git a/pkg/xds/generator/zoneproxy/destinations.go b/pkg/xds/generator/zoneproxy/destinations.go
new file mode 100644
index 0000000..1c2ec56
--- /dev/null
+++ b/pkg/xds/generator/zoneproxy/destinations.go
@@ -0,0 +1,48 @@
+package zoneproxy
+
+import (
+ "reflect"
+)
+
+import (
+ "golang.org/x/exp/slices"
+)
+
+import (
+ mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
+ xds_context "github.com/apache/dubbo-kubernetes/pkg/xds/context"
+ envoy_tags "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/tags"
+)
+
+func BuildMeshDestinations(
+ availableServices []*mesh_proto.ZoneIngress_AvailableService, // available services for a single mesh
+ res xds_context.Resources,
+) map[string][]envoy_tags.Tags {
+ destForMesh := map[string][]envoy_tags.Tags{}
+ addTrafficFlowByDefaultDestination(destForMesh)
+ return destForMesh
+}
+
+// addTrafficFlowByDefaultDestination Make sure that when
+// at least one MeshHTTPRoute policy exists there will be a "match all"
+// destination pointing to all services (dubbo.io/service:* -> dubbo.io/service:*)
+func addTrafficFlowByDefaultDestination(
+ destinations map[string][]envoy_tags.Tags,
+) {
+ // We need to add a destination to route any service to any instance of
+ // that service
+ matchAllTags := envoy_tags.Tags{mesh_proto.ServiceTag: mesh_proto.MatchAllTag}
+ matchAllDestinations := destinations[mesh_proto.MatchAllTag]
+ foundAllServicesDestination := slices.ContainsFunc(
+ matchAllDestinations,
+ func(tagsElem envoy_tags.Tags) bool {
+ return reflect.DeepEqual(tagsElem, matchAllTags)
+ },
+ )
+
+ if !foundAllServicesDestination {
+ matchAllDestinations = append(matchAllDestinations, matchAllTags)
+ }
+
+ destinations[mesh_proto.MatchAllTag] = matchAllDestinations
+}
diff --git a/pkg/xds/generator/zoneproxy/generator.go b/pkg/xds/generator/zoneproxy/generator.go
new file mode 100644
index 0000000..9db2262
--- /dev/null
+++ b/pkg/xds/generator/zoneproxy/generator.go
@@ -0,0 +1,155 @@
+package zoneproxy
+
+import (
+ mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
+ core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
+ envoy_common "github.com/apache/dubbo-kubernetes/pkg/xds/envoy"
+ envoy_clusters "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/clusters"
+ envoy_endpoints "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/endpoints"
+ envoy_listeners "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/listeners"
+ envoy_names "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/names"
+ envoy_tags "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/tags"
+)
+
+func GenerateCDS(
+ destinationsPerService map[string][]envoy_tags.Tags,
+ services envoy_common.Services,
+ apiVersion core_xds.APIVersion,
+ meshName string,
+ origin string,
+) ([]*core_xds.Resource, error) {
+ matchAllDestinations := destinationsPerService[mesh_proto.MatchAllTag]
+
+ var resources []*core_xds.Resource
+ for _, service := range services.Sorted() {
+ clusters := services[service]
+
+ var tagsSlice envoy_tags.TagsSlice
+ for _, cluster := range clusters.Clusters() {
+ tagsSlice = append(tagsSlice, cluster.Tags())
+ }
+ tagSlice := append(tagsSlice, matchAllDestinations...)
+
+ tagKeySlice := tagSlice.ToTagKeysSlice().Transform(
+ envoy_tags.Without(mesh_proto.ServiceTag),
+ )
+
+ clusterName := envoy_names.GetMeshClusterName(meshName, service)
+ edsCluster, err := envoy_clusters.NewClusterBuilder(apiVersion, clusterName).
+ Configure(envoy_clusters.EdsCluster()).
+ Configure(envoy_clusters.LbSubset(tagKeySlice)).
+ Build()
+ if err != nil {
+ return nil, err
+ }
+ resources = append(resources, &core_xds.Resource{
+ Name: clusterName,
+ Origin: origin,
+ Resource: edsCluster,
+ })
+ }
+
+ return resources, nil
+}
+
+func GenerateEDS(
+ services envoy_common.Services,
+ endpointMap core_xds.EndpointMap,
+ apiVersion core_xds.APIVersion,
+ meshName string,
+ origin string,
+) ([]*core_xds.Resource, error) {
+ var resources []*core_xds.Resource
+
+ for _, service := range services.Sorted() {
+ endpoints := endpointMap[service]
+
+ clusterName := envoy_names.GetMeshClusterName(meshName, service)
+ cla, err := envoy_endpoints.CreateClusterLoadAssignment(clusterName, endpoints, apiVersion)
+ if err != nil {
+ return nil, err
+ }
+ resources = append(resources, &core_xds.Resource{
+ Name: clusterName,
+ Origin: origin,
+ Resource: cla,
+ })
+ }
+
+ return resources, nil
+}
+
+// AddFilterChains adds filter chains to a listener. We generate
+// FilterChainsMatcher for each unique destination. This approach has
+// a limitation: additional tags on outbound in Universal mode won't work across
+// different zones.
+func AddFilterChains(
+ availableServices []*mesh_proto.ZoneIngress_AvailableService,
+ apiVersion core_xds.APIVersion,
+ listenerBuilder *envoy_listeners.ListenerBuilder,
+ destinationsPerService map[string][]envoy_tags.Tags,
+ endpointMap core_xds.EndpointMap,
+) envoy_common.Services {
+ servicesAcc := envoy_common.NewServicesAccumulator(nil)
+
+ for _, service := range availableServices {
+ serviceName := service.Tags[mesh_proto.ServiceTag]
+ destinations := destinationsPerService[serviceName]
+ destinations = append(destinations, destinationsPerService[mesh_proto.MatchAllTag]...)
+ clusterName := envoy_names.GetMeshClusterName(service.Mesh, serviceName)
+ serviceEndpoints := endpointMap[serviceName]
+
+ for _, destination := range destinations {
+
+ // relevantTags is a set of tags for which it actually makes sense to do LB split on.
+ // If the endpoint list is the same with or without the tag, we should just not do the split.
+ // This solves the problem that Envoy deduplicate endpoints of the same address and different metadata.
+ // example 1:
+ // Ingress1 (10.0.0.1) supports service:a,version:1 and service:a,version:2
+ // Ingress2 (10.0.0.2) supports service:a,version:1 and service:a,version:2
+ // If we want to split by version, we don't need to do LB subset on version.
+ //
+ // example 2:
+ // Ingress1 (10.0.0.1) supports service:a,version:1
+ // Ingress2 (10.0.0.2) supports service:a,version:2
+ // If we want to split by version, we need LB subset.
+ relevantTags := envoy_tags.Tags{}
+ for key, value := range destination {
+ matchedTargets := map[string]struct{}{}
+ allTargets := map[string]struct{}{}
+ for _, endpoint := range serviceEndpoints {
+ address := endpoint.Address()
+ if endpoint.Tags[key] == value || value == mesh_proto.MatchAllTag {
+ matchedTargets[address] = struct{}{}
+ }
+ allTargets[address] = struct{}{}
+ }
+ if len(matchedTargets) < len(allTargets) {
+ relevantTags[key] = value
+ }
+ }
+
+ cluster := envoy_common.NewCluster(
+ envoy_common.WithName(clusterName),
+ envoy_common.WithService(serviceName),
+ envoy_common.WithTags(relevantTags),
+ )
+ cluster.SetMesh(service.Mesh)
+
+ filterChain := envoy_listeners.FilterChain(
+ envoy_listeners.NewFilterChainBuilder(apiVersion, envoy_common.AnonymousResource).Configure(
+ envoy_listeners.TcpProxyDeprecatedWithMetadata(
+ clusterName,
+ cluster,
+ ),
+ ),
+ )
+
+ listenerBuilder.Configure(filterChain)
+
+ servicesAcc.Add(cluster)
+ }
+ }
+
+ return servicesAcc.Services()
+}
diff --git a/pkg/xds/ingress/dataplane.go b/pkg/xds/ingress/dataplane.go
new file mode 100644
index 0000000..744bd48
--- /dev/null
+++ b/pkg/xds/ingress/dataplane.go
@@ -0,0 +1,116 @@
+package ingress
+
+import (
+ "context"
+ "fmt"
+ "sort"
+ "strings"
+)
+
+import (
+ "golang.org/x/exp/slices"
+
+ "google.golang.org/protobuf/proto"
+)
+
+import (
+ mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
+ core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+ "github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
+ envoy "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/tags"
+)
+
+// tagSets represent map from tags (encoded as string) to number of instances
+type tagSets map[serviceKey]uint32
+
+type serviceKey struct {
+ mesh string
+ tags string
+}
+
+type serviceKeySlice []serviceKey
+
+func (s serviceKeySlice) Len() int { return len(s) }
+func (s serviceKeySlice) Less(i, j int) bool {
+ return s[i].mesh < s[j].mesh || (s[i].mesh == s[j].mesh && s[i].tags < s[j].tags)
+}
+func (s serviceKeySlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+
+func (sk *serviceKey) String() string {
+ return fmt.Sprintf("%s.%s", sk.tags, sk.mesh)
+}
+
+func (s tagSets) addInstanceOfTags(mesh string, tags envoy.Tags) {
+ strTags := tags.String()
+ s[serviceKey{tags: strTags, mesh: mesh}]++
+}
+
+func (s tagSets) toAvailableServices() []*mesh_proto.ZoneIngress_AvailableService {
+ var result []*mesh_proto.ZoneIngress_AvailableService
+
+ var keys []serviceKey
+ for key := range s {
+ keys = append(keys, key)
+ }
+ sort.Sort(serviceKeySlice(keys))
+
+ for _, key := range keys {
+ tags, _ := envoy.TagsFromString(key.tags) // ignore error since we control how string looks like
+ result = append(result, &mesh_proto.ZoneIngress_AvailableService{
+ Tags: tags,
+ Instances: s[key],
+ Mesh: key.mesh,
+ })
+ }
+ return result
+}
+
+func UpdateAvailableServices(
+ ctx context.Context,
+ rm manager.ResourceManager,
+ ingress *core_mesh.ZoneIngressResource,
+ otherDataplanes []*core_mesh.DataplaneResource,
+ tagFilters []string,
+) error {
+ availableServices := GetIngressAvailableServices(otherDataplanes, tagFilters)
+
+ if availableServicesEqual(availableServices, ingress.Spec.GetAvailableServices()) {
+ return nil
+ }
+ ingress.Spec.AvailableServices = availableServices
+ if err := rm.Update(ctx, ingress); err != nil {
+ return err
+ }
+ return nil
+}
+
+func availableServicesEqual(services []*mesh_proto.ZoneIngress_AvailableService, other []*mesh_proto.ZoneIngress_AvailableService) bool {
+ if len(services) != len(other) {
+ return false
+ }
+ for i := range services {
+ if !proto.Equal(services[i], other[i]) {
+ return false
+ }
+ }
+ return true
+}
+
+func GetIngressAvailableServices(others []*core_mesh.DataplaneResource, tagFilters []string) []*mesh_proto.ZoneIngress_AvailableService {
+ tagSets := tagSets{}
+ for _, dp := range others {
+ for _, dpInbound := range dp.Spec.GetNetworking().GetHealthyInbounds() {
+ tags := map[string]string{}
+ for key, value := range dpInbound.Tags {
+ hasPrefix := func(tagFilter string) bool {
+ return strings.HasPrefix(key, tagFilter)
+ }
+ if len(tagFilters) == 0 || slices.ContainsFunc(tagFilters, hasPrefix) {
+ tags[key] = value
+ }
+ }
+ tagSets.addInstanceOfTags(dp.GetMeta().GetMesh(), tags)
+ }
+ }
+ return tagSets.toAvailableServices()
+}
diff --git a/pkg/xds/ingress/outbound.go b/pkg/xds/ingress/outbound.go
new file mode 100644
index 0000000..b128e88
--- /dev/null
+++ b/pkg/xds/ingress/outbound.go
@@ -0,0 +1,39 @@
+package ingress
+
+import (
+ mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
+ core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+ core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
+)
+
+func BuildEndpointMap(
+ destinations core_xds.DestinationMap,
+ dataplanes []*core_mesh.DataplaneResource,
+) core_xds.EndpointMap {
+ if len(destinations) == 0 {
+ return nil
+ }
+
+ outbound := core_xds.EndpointMap{}
+ for _, dataplane := range dataplanes {
+ for _, inbound := range dataplane.Spec.GetNetworking().GetHealthyInbounds() {
+ service := inbound.Tags[mesh_proto.ServiceTag]
+ selectors, ok := destinations[service]
+ if !ok {
+ continue
+ }
+ if !selectors.Matches(inbound.Tags) {
+ continue
+ }
+ iface := dataplane.Spec.GetNetworking().ToInboundInterface(inbound)
+ outbound[service] = append(outbound[service], core_xds.Endpoint{
+ Target: iface.DataplaneAdvertisedIP,
+ Port: iface.DataplanePort,
+ Tags: inbound.Tags,
+ Weight: 1,
+ })
+ }
+ }
+
+ return outbound
+}
diff --git a/pkg/xds/ingress/router.go b/pkg/xds/ingress/router.go
new file mode 100644
index 0000000..604961a
--- /dev/null
+++ b/pkg/xds/ingress/router.go
@@ -0,0 +1,19 @@
+package ingress
+
+import (
+ mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
+ core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
+ core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
+)
+
+func BuildDestinationMap(mesh string, ingress *core_mesh.ZoneIngressResource) core_xds.DestinationMap {
+ destinations := core_xds.DestinationMap{}
+ for _, svc := range ingress.Spec.GetAvailableServices() {
+ if mesh != svc.GetMesh() {
+ continue
+ }
+ service := svc.Tags[mesh_proto.ServiceTag]
+ destinations[service] = destinations[service].Add(mesh_proto.MatchTags(svc.Tags))
+ }
+ return destinations
+}
diff --git a/pkg/xds/sync/componenets.go b/pkg/xds/sync/componenets.go
index 4a8e8be..cdf79c8 100644
--- a/pkg/xds/sync/componenets.go
+++ b/pkg/xds/sync/componenets.go
@@ -73,6 +73,7 @@
IngressProxyBuilder: ingressProxyBuilder,
IngressReconciler: ingressReconciler,
EnvoyCpCtx: envoyCpCtx,
+ MeshCache: rt.MeshCache(),
MetadataTracker: metadataTracker,
ResManager: rt.ReadOnlyResourceManager(),
}
diff --git a/pkg/xds/sync/dataplane_watchdog.go b/pkg/xds/sync/dataplane_watchdog.go
index e9b443d..72d239f 100644
--- a/pkg/xds/sync/dataplane_watchdog.go
+++ b/pkg/xds/sync/dataplane_watchdog.go
@@ -116,7 +116,43 @@
}
func (d *DataplaneWatchdog) syncIngress(ctx context.Context, metadata *core_xds.DataplaneMetadata) (SyncResult, error) {
- return SyncResult{}, nil
+ envoyCtx := &xds_context.Context{
+ ControlPlane: d.EnvoyCpCtx,
+ Mesh: xds_context.MeshContext{}, // ZoneIngress does not have a mesh!
+ }
+
+ aggregatedMeshCtxs, err := xds_context.AggregateMeshContexts(ctx, d.ResManager, d.MeshCache.GetMeshContext)
+ if err != nil {
+ return SyncResult{}, errors.Wrap(err, "could not aggregate mesh contexts")
+ }
+
+ result := SyncResult{
+ ProxyType: mesh_proto.IngressProxyType,
+ }
+ syncForConfig := aggregatedMeshCtxs.Hash != d.lastHash
+ if !syncForConfig {
+ result.Status = SkipStatus
+ return result, nil
+ }
+
+ d.log.V(1).Info("snapshot hash updated, reconcile", "prev", d.lastHash, "current", aggregatedMeshCtxs.Hash)
+ d.lastHash = aggregatedMeshCtxs.Hash
+
+ proxy, err := d.IngressProxyBuilder.Build(ctx, d.key, aggregatedMeshCtxs)
+ if err != nil {
+ return SyncResult{}, errors.Wrap(err, "could not build ingress proxy")
+ }
+ proxy.Metadata = metadata
+ changed, err := d.IngressReconciler.Reconcile(ctx, *envoyCtx, proxy)
+ if err != nil {
+ return SyncResult{}, errors.Wrap(err, "could not reconcile")
+ }
+ if changed {
+ result.Status = ChangedStatus
+ } else {
+ result.Status = GeneratedStatus
+ }
+ return result, nil
}
func (d *DataplaneWatchdog) syncEgress(ctx context.Context, metadata *core_xds.DataplaneMetadata) (SyncResult, error) {
diff --git a/pkg/xds/sync/ingress_proxy_builder.go b/pkg/xds/sync/ingress_proxy_builder.go
index 6cdecb8..5657978 100644
--- a/pkg/xds/sync/ingress_proxy_builder.go
+++ b/pkg/xds/sync/ingress_proxy_builder.go
@@ -22,14 +22,20 @@
)
import (
+ "github.com/apache/dubbo-kubernetes/pkg/core/dns/lookup"
+ core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
+ core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
xds_context "github.com/apache/dubbo-kubernetes/pkg/xds/context"
+ "github.com/apache/dubbo-kubernetes/pkg/xds/ingress"
+ xds_topology "github.com/apache/dubbo-kubernetes/pkg/xds/topology"
)
type IngressProxyBuilder struct {
ResManager manager.ResourceManager
+ LookupIP lookup.LookupIPFunc
apiVersion core_xds.APIVersion
zone string
@@ -39,7 +45,86 @@
func (p *IngressProxyBuilder) Build(
ctx context.Context,
key core_model.ResourceKey,
- aggregatedMeshCtxs xds_context.MeshContext,
+ aggregatedMeshCtxs xds_context.AggregatedMeshContexts,
) (*core_xds.Proxy, error) {
- return &core_xds.Proxy{}, nil
+ zoneIngress, err := p.getZoneIngress(ctx, key, aggregatedMeshCtxs)
+ if err != nil {
+ return nil, err
+ }
+
+ zoneIngress, err = xds_topology.ResolveZoneIngressPublicAddress(p.LookupIP, zoneIngress)
+ if err != nil {
+ return nil, err
+ }
+
+ proxy := &core_xds.Proxy{
+ Id: core_xds.FromResourceKey(key),
+ APIVersion: p.apiVersion,
+ Zone: p.zone,
+ ZoneIngressProxy: p.buildZoneIngressProxy(zoneIngress, aggregatedMeshCtxs),
+ }
+
+ return proxy, nil
+}
+
+func (p *IngressProxyBuilder) buildZoneIngressProxy(
+ zoneIngress *core_mesh.ZoneIngressResource,
+ aggregatedMeshCtxs xds_context.AggregatedMeshContexts,
+) *core_xds.ZoneIngressProxy {
+ var meshResourceList []*core_xds.MeshIngressResources
+
+ for _, mesh := range aggregatedMeshCtxs.Meshes {
+ meshName := mesh.GetMeta().GetName()
+ meshCtx := aggregatedMeshCtxs.MustGetMeshContext(meshName)
+
+ meshResources := &core_xds.MeshIngressResources{
+ Mesh: mesh,
+ EndpointMap: ingress.BuildEndpointMap(
+ ingress.BuildDestinationMap(meshName, zoneIngress),
+ meshCtx.Resources.Dataplanes().Items,
+ ),
+ Resources: meshCtx.Resources.MeshLocalResources,
+ }
+
+ meshResourceList = append(meshResourceList, meshResources)
+ }
+
+ return &core_xds.ZoneIngressProxy{
+ ZoneIngressResource: zoneIngress,
+ MeshResourceList: meshResourceList,
+ }
+}
+
+func (p *IngressProxyBuilder) getZoneIngress(
+ ctx context.Context,
+ key core_model.ResourceKey,
+ aggregatedMeshCtxs xds_context.AggregatedMeshContexts,
+) (*core_mesh.ZoneIngressResource, error) {
+ zoneIngress := core_mesh.NewZoneIngressResource()
+ if err := p.ResManager.Get(ctx, zoneIngress, core_store.GetBy(key)); err != nil {
+ return nil, err
+ }
+ // Update Ingress' Available Services
+ // This was placed as an operation of DataplaneWatchdog out of the convenience.
+ // Consider moving to the outside of this component
+ if err := p.updateIngress(ctx, zoneIngress, aggregatedMeshCtxs); err != nil {
+ return nil, err
+ }
+ return zoneIngress, nil
+}
+
+func (p *IngressProxyBuilder) updateIngress(
+ ctx context.Context, zoneIngress *core_mesh.ZoneIngressResource,
+ aggregatedMeshCtxs xds_context.AggregatedMeshContexts,
+) error {
+ // Update Ingress' Available Services
+ // This was placed as an operation of DataplaneWatchdog out of the convenience.
+ // Consider moving to the outside of this component (follow the pattern of updating VIP outbounds)
+ return ingress.UpdateAvailableServices(
+ ctx,
+ p.ResManager,
+ zoneIngress,
+ aggregatedMeshCtxs.AllDataplanes(),
+ p.ingressTagFilters,
+ )
}
diff --git a/pkg/xds/topology/dataplane.go b/pkg/xds/topology/dataplane.go
index 66683d5..9ece62c 100644
--- a/pkg/xds/topology/dataplane.go
+++ b/pkg/xds/topology/dataplane.go
@@ -65,6 +65,22 @@
return dataplane, nil
}
+func ResolveZoneIngressPublicAddress(lookupIPFunc lookup.LookupIPFunc, zoneIngress *core_mesh.ZoneIngressResource) (*core_mesh.ZoneIngressResource, error) {
+ ip, err := lookupFirstIp(lookupIPFunc, zoneIngress.Spec.GetNetworking().GetAdvertisedAddress())
+ if err != nil {
+ return nil, err
+ }
+ if ip != "" { // only if we resolve any address, in most cases this is IP not a hostname
+ ziSpec := proto.Clone(zoneIngress.Spec).(*mesh_proto.ZoneIngress)
+ ziSpec.Networking.AdvertisedAddress = ip
+ return &core_mesh.ZoneIngressResource{
+ Meta: zoneIngress.Meta,
+ Spec: ziSpec,
+ }, nil
+ }
+ return zoneIngress, nil
+}
+
func lookupFirstIp(lookupIPFunc lookup.LookupIPFunc, address string) (string, error) {
if address == "" || net.ParseIP(address) != nil { // There's either no address or it's already an ip so nothing to do
return "", nil