blob: 9db226205f8ab806cd8e2f83a901c6e944e319c4 [file] [log] [blame]
package zoneproxy
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
envoy_common "github.com/apache/dubbo-kubernetes/pkg/xds/envoy"
envoy_clusters "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/clusters"
envoy_endpoints "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/endpoints"
envoy_listeners "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/listeners"
envoy_names "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/names"
envoy_tags "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/tags"
)
func GenerateCDS(
destinationsPerService map[string][]envoy_tags.Tags,
services envoy_common.Services,
apiVersion core_xds.APIVersion,
meshName string,
origin string,
) ([]*core_xds.Resource, error) {
matchAllDestinations := destinationsPerService[mesh_proto.MatchAllTag]
var resources []*core_xds.Resource
for _, service := range services.Sorted() {
clusters := services[service]
var tagsSlice envoy_tags.TagsSlice
for _, cluster := range clusters.Clusters() {
tagsSlice = append(tagsSlice, cluster.Tags())
}
tagSlice := append(tagsSlice, matchAllDestinations...)
tagKeySlice := tagSlice.ToTagKeysSlice().Transform(
envoy_tags.Without(mesh_proto.ServiceTag),
)
clusterName := envoy_names.GetMeshClusterName(meshName, service)
edsCluster, err := envoy_clusters.NewClusterBuilder(apiVersion, clusterName).
Configure(envoy_clusters.EdsCluster()).
Configure(envoy_clusters.LbSubset(tagKeySlice)).
Build()
if err != nil {
return nil, err
}
resources = append(resources, &core_xds.Resource{
Name: clusterName,
Origin: origin,
Resource: edsCluster,
})
}
return resources, nil
}
func GenerateEDS(
services envoy_common.Services,
endpointMap core_xds.EndpointMap,
apiVersion core_xds.APIVersion,
meshName string,
origin string,
) ([]*core_xds.Resource, error) {
var resources []*core_xds.Resource
for _, service := range services.Sorted() {
endpoints := endpointMap[service]
clusterName := envoy_names.GetMeshClusterName(meshName, service)
cla, err := envoy_endpoints.CreateClusterLoadAssignment(clusterName, endpoints, apiVersion)
if err != nil {
return nil, err
}
resources = append(resources, &core_xds.Resource{
Name: clusterName,
Origin: origin,
Resource: cla,
})
}
return resources, nil
}
// AddFilterChains adds filter chains to a listener. We generate
// FilterChainsMatcher for each unique destination. This approach has
// a limitation: additional tags on outbound in Universal mode won't work across
// different zones.
func AddFilterChains(
availableServices []*mesh_proto.ZoneIngress_AvailableService,
apiVersion core_xds.APIVersion,
listenerBuilder *envoy_listeners.ListenerBuilder,
destinationsPerService map[string][]envoy_tags.Tags,
endpointMap core_xds.EndpointMap,
) envoy_common.Services {
servicesAcc := envoy_common.NewServicesAccumulator(nil)
for _, service := range availableServices {
serviceName := service.Tags[mesh_proto.ServiceTag]
destinations := destinationsPerService[serviceName]
destinations = append(destinations, destinationsPerService[mesh_proto.MatchAllTag]...)
clusterName := envoy_names.GetMeshClusterName(service.Mesh, serviceName)
serviceEndpoints := endpointMap[serviceName]
for _, destination := range destinations {
// relevantTags is a set of tags for which it actually makes sense to do LB split on.
// If the endpoint list is the same with or without the tag, we should just not do the split.
// This solves the problem that Envoy deduplicate endpoints of the same address and different metadata.
// example 1:
// Ingress1 (10.0.0.1) supports service:a,version:1 and service:a,version:2
// Ingress2 (10.0.0.2) supports service:a,version:1 and service:a,version:2
// If we want to split by version, we don't need to do LB subset on version.
//
// example 2:
// Ingress1 (10.0.0.1) supports service:a,version:1
// Ingress2 (10.0.0.2) supports service:a,version:2
// If we want to split by version, we need LB subset.
relevantTags := envoy_tags.Tags{}
for key, value := range destination {
matchedTargets := map[string]struct{}{}
allTargets := map[string]struct{}{}
for _, endpoint := range serviceEndpoints {
address := endpoint.Address()
if endpoint.Tags[key] == value || value == mesh_proto.MatchAllTag {
matchedTargets[address] = struct{}{}
}
allTargets[address] = struct{}{}
}
if len(matchedTargets) < len(allTargets) {
relevantTags[key] = value
}
}
cluster := envoy_common.NewCluster(
envoy_common.WithName(clusterName),
envoy_common.WithService(serviceName),
envoy_common.WithTags(relevantTags),
)
cluster.SetMesh(service.Mesh)
filterChain := envoy_listeners.FilterChain(
envoy_listeners.NewFilterChainBuilder(apiVersion, envoy_common.AnonymousResource).Configure(
envoy_listeners.TcpProxyDeprecatedWithMetadata(
clusterName,
cluster,
),
),
)
listenerBuilder.Configure(filterChain)
servicesAcc.Add(cluster)
}
}
return servicesAcc.Services()
}