blob: c01e8bb13aa08789565b331eb80e75cdfc6c069a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package generator
import (
"context"
"fmt"
)
import (
"github.com/pkg/errors"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/core"
core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
"github.com/apache/dubbo-kubernetes/pkg/core/user"
model "github.com/apache/dubbo-kubernetes/pkg/core/xds"
"github.com/apache/dubbo-kubernetes/pkg/util/maps"
util_protocol "github.com/apache/dubbo-kubernetes/pkg/util/protocol"
xds_context "github.com/apache/dubbo-kubernetes/pkg/xds/context"
envoy_common "github.com/apache/dubbo-kubernetes/pkg/xds/envoy"
envoy_clusters "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/clusters"
envoy_listeners "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/listeners"
envoy_names "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/names"
envoy_tags "github.com/apache/dubbo-kubernetes/pkg/xds/envoy/tags"
)
var outboundLog = core.Log.WithName("outbound-proxy-generator")
// OriginOutbound is a marker to indicate by which ProxyGenerator resources were generated.
const OriginOutbound = "outbound"
type OutboundProxyGenerator struct{}
func (g OutboundProxyGenerator) Generator(ctx context.Context, _ *model.ResourceSet, xdsCtx xds_context.Context, proxy *model.Proxy) (*model.ResourceSet, error) {
outbounds := proxy.Dataplane.Spec.Networking.GetOutbound()
resources := model.NewResourceSet()
if len(outbounds) == 0 {
return resources, nil
}
// TODO: implement the logic of tlsReadiness
tlsReadiness := make(map[string]bool)
servicesAcc := envoy_common.NewServicesAccumulator(tlsReadiness)
outboundsMultipleIPs := buildOutboundsWithMultipleIPs(proxy.Dataplane, outbounds)
for _, outbound := range outboundsMultipleIPs {
// Determine the list of destination subsets for the given outbound service
routes := g.determineRoutes(proxy, outbound.Tags)
clusters := routes.Clusters()
// Infer the compatible protocol for all the apps for the given service
protocol := inferProtocol(xdsCtx.Mesh, clusters)
servicesAcc.Add(clusters...)
// Generate listener
listener, err := g.generateLDS(xdsCtx, proxy, routes, outbound, protocol)
if err != nil {
return nil, err
}
resources.Add(&model.Resource{
Name: listener.GetName(),
Origin: OriginOutbound,
Resource: listener,
})
}
services := servicesAcc.Services()
// Generate clusters. It cannot be generated on the fly with outbound loop because we need to know all subsets of the cluster for every service.
cdsResources, err := g.generateCDS(xdsCtx, services, proxy)
if err != nil {
return nil, err
}
resources.AddSet(cdsResources)
edsResources, err := g.generateEDS(ctx, xdsCtx, services, proxy)
if err != nil {
return nil, err
}
resources.AddSet(edsResources)
return resources, nil
}
func (OutboundProxyGenerator) generateLDS(ctx xds_context.Context, proxy *model.Proxy, routes envoy_common.Routes, outbound OutboundWithMultipleIPs, protocol core_mesh.Protocol) (envoy_common.NamedResource, error) {
oface := outbound.Addresses[0]
serviceName := outbound.Tags[mesh_proto.ServiceTag]
outboundListenerName := envoy_names.GetOutboundListenerName(oface.DataplaneIP, oface.DataplanePort)
filterChainBuilder := func() *envoy_listeners.FilterChainBuilder {
filterChainBuilder := envoy_listeners.NewFilterChainBuilder(proxy.APIVersion, envoy_common.AnonymousResource)
switch protocol {
case core_mesh.ProtocolTriple:
// TODO: implement the logic of Triple
// currently, we use the tcp proxy for the triple protocol
filterChainBuilder.
Configure(envoy_listeners.TripleConnectionManager()).
Configure(envoy_listeners.TcpProxyDeprecated(serviceName, routes.Clusters()...))
case core_mesh.ProtocolGRPC:
filterChainBuilder.
Configure(envoy_listeners.HttpConnectionManager(serviceName, false)).
Configure(envoy_listeners.HttpOutboundRoute(serviceName, routes, proxy.Dataplane.Spec.TagSet())).
Configure(envoy_listeners.GrpcStats())
case core_mesh.ProtocolHTTP, core_mesh.ProtocolHTTP2:
filterChainBuilder.
Configure(envoy_listeners.HttpConnectionManager(serviceName, false)).
Configure(envoy_listeners.HttpOutboundRoute(serviceName, routes, proxy.Dataplane.Spec.TagSet()))
case core_mesh.ProtocolKafka:
filterChainBuilder.
Configure(envoy_listeners.Kafka(serviceName)).
Configure(envoy_listeners.TcpProxyDeprecated(serviceName, routes.Clusters()...))
case core_mesh.ProtocolTCP:
fallthrough
default:
// configuration for non-HTTP cases
filterChainBuilder.
Configure(envoy_listeners.TcpProxyDeprecated(serviceName, routes.Clusters()...))
}
return filterChainBuilder
}()
listener, err := envoy_listeners.NewOutboundListenerBuilder(proxy.APIVersion, oface.DataplaneIP, oface.DataplanePort, model.SocketAddressProtocolTCP).
Configure(envoy_listeners.FilterChain(filterChainBuilder)).
Configure(envoy_listeners.TagsMetadata(envoy_tags.Tags(outbound.Tags).WithoutTags(mesh_proto.MeshTag))).
Configure(envoy_listeners.AdditionalAddresses(outbound.AdditionalAddresses())).
Build()
if err != nil {
return nil, errors.Wrapf(err, "could not generate listener %s for service %s", outboundListenerName, serviceName)
}
return listener, nil
}
func (g OutboundProxyGenerator) generateCDS(ctx xds_context.Context, services envoy_common.Services, proxy *model.Proxy) (*model.ResourceSet, error) {
resources := model.NewResourceSet()
for _, serviceName := range services.Sorted() {
service := services[serviceName]
protocol := ctx.Mesh.GetServiceProtocol(serviceName)
for _, c := range service.Clusters() {
cluster := c.(*envoy_common.ClusterImpl)
clusterName := cluster.Name()
edsClusterBuilder := envoy_clusters.NewClusterBuilder(proxy.APIVersion, clusterName)
// clusterTags := []envoy_tags.Tags{cluster.Tags()}
if service.HasExternalService() {
if ctx.Mesh.Resource.ZoneEgressEnabled() {
edsClusterBuilder.
Configure(envoy_clusters.EdsCluster())
} else {
endpoints := proxy.Routing.ExternalServiceOutboundTargets[serviceName]
isIPv6 := proxy.Dataplane.IsIPv6()
edsClusterBuilder.
Configure(envoy_clusters.ProvidedEndpointCluster(isIPv6, endpoints...))
}
switch protocol {
case core_mesh.ProtocolHTTP:
edsClusterBuilder.Configure(envoy_clusters.Http())
case core_mesh.ProtocolHTTP2, core_mesh.ProtocolGRPC:
edsClusterBuilder.Configure(envoy_clusters.Http2())
default:
}
} else {
edsClusterBuilder.
Configure(envoy_clusters.EdsCluster()).
Configure(envoy_clusters.Http2())
}
edsCluster, err := edsClusterBuilder.Build()
if err != nil {
return nil, errors.Wrapf(err, "build CDS for cluster %s failed", clusterName)
}
resources.Add(&model.Resource{
Name: clusterName,
Origin: OriginOutbound,
Resource: edsCluster,
})
}
}
return resources, nil
}
func (OutboundProxyGenerator) generateEDS(
ctx context.Context,
xdsCtx xds_context.Context,
services envoy_common.Services,
proxy *model.Proxy,
) (*model.ResourceSet, error) {
apiVersion := proxy.APIVersion
resources := model.NewResourceSet()
for _, serviceName := range services.Sorted() {
// When no zone egress is present in a mesh Endpoints for ExternalServices
// are specified in load assignment in DNS Cluster.
// We are not allowed to add endpoints with DNS names through EDS.
if !services[serviceName].HasExternalService() || xdsCtx.Mesh.Resource.ZoneEgressEnabled() {
for _, c := range services[serviceName].Clusters() {
cluster := c.(*envoy_common.ClusterImpl)
var endpoints model.EndpointMap
if cluster.Mesh() != "" {
// TODO: CrossMeshEndpoints is not implemented yet
} else {
endpoints = xdsCtx.Mesh.EndpointMap
}
loadAssignment, err := xdsCtx.ControlPlane.CLACache.GetCLA(user.Ctx(ctx, user.ControlPlane), xdsCtx.Mesh.Resource.Meta.GetName(), xdsCtx.Mesh.Hash, cluster, apiVersion, endpoints)
if err != nil {
return nil, errors.Wrapf(err, "could not get ClusterLoadAssignment for %s", serviceName)
}
resources.Add(&model.Resource{
Name: cluster.Name(),
Origin: OriginOutbound,
Resource: loadAssignment,
})
}
}
}
return resources, nil
}
func inferProtocol(meshCtx xds_context.MeshContext, clusters []envoy_common.Cluster) core_mesh.Protocol {
var protocol core_mesh.Protocol = core_mesh.ProtocolUnknown
for idx, cluster := range clusters {
serviceName := cluster.Tags()[mesh_proto.ServiceTag]
serviceProtocol := meshCtx.GetServiceProtocol(serviceName)
if idx == 0 {
protocol = serviceProtocol
continue
}
protocol = util_protocol.GetCommonProtocol(serviceProtocol, protocol)
}
return protocol
}
func (OutboundProxyGenerator) determineRoutes(
proxy *model.Proxy,
outboundTags map[string]string,
) envoy_common.Routes {
var routes envoy_common.Routes
retriveClusters := func() []envoy_common.Cluster {
var clusters []envoy_common.Cluster
service := outboundTags[mesh_proto.ServiceTag]
name, _ := envoy_tags.Tags(outboundTags).DestinationClusterName(nil)
if mesh, ok := outboundTags[mesh_proto.MeshTag]; ok {
// The name should be distinct to the service & mesh combination
name = fmt.Sprintf("%s_%s", name, mesh)
}
// We assume that all the targets are either ExternalServices or not
// therefore we check only the first one
var isExternalService bool
if endpoints := proxy.Routing.OutboundTargets[service]; len(endpoints) > 0 {
isExternalService = endpoints[0].IsExternalService()
}
if endpoints := proxy.Routing.ExternalServiceOutboundTargets[service]; len(endpoints) > 0 {
isExternalService = true
}
allTags := envoy_tags.Tags(outboundTags)
cluster := envoy_common.NewCluster(
envoy_common.WithService(service),
envoy_common.WithName(name),
envoy_common.WithTags(allTags.WithoutTags(mesh_proto.MeshTag)),
envoy_common.WithExternalService(isExternalService),
)
if mesh, ok := outboundTags[mesh_proto.MeshTag]; ok {
cluster.SetMesh(mesh)
}
clusters = append(clusters, cluster)
return clusters
}
appendRoute := func(routes envoy_common.Routes, clusters []envoy_common.Cluster) envoy_common.Routes {
if len(clusters) == 0 {
return routes
}
return append(routes, envoy_common.Route{
Clusters: clusters,
})
}
clusters := retriveClusters()
routes = appendRoute(routes, clusters)
return routes
}
type OutboundWithMultipleIPs struct {
Tags map[string]string
Addresses []mesh_proto.OutboundInterface
}
func (o OutboundWithMultipleIPs) AdditionalAddresses() []mesh_proto.OutboundInterface {
if len(o.Addresses) > 1 {
return o.Addresses[1:]
}
return nil
}
func buildOutboundsWithMultipleIPs(dataplane *core_mesh.DataplaneResource, outbounds []*mesh_proto.Dataplane_Networking_Outbound) []OutboundWithMultipleIPs {
tagsToOutbounds := map[string]OutboundWithMultipleIPs{}
for _, outbound := range outbounds {
tags := outbound.GetTags()
tags[mesh_proto.ServiceTag] = outbound.GetService()
tagsStr := mesh_proto.SingleValueTagSet(tags).String()
owmi := tagsToOutbounds[tagsStr]
owmi.Tags = tags
address := dataplane.Spec.Networking.ToOutboundInterface(outbound)
owmi.Addresses = append([]mesh_proto.OutboundInterface{address}, owmi.Addresses...)
tagsToOutbounds[tagsStr] = owmi
}
// return sorted outbounds for a stable XDS config
var result []OutboundWithMultipleIPs
for _, key := range maps.SortedKeys(tagsToOutbounds) {
result = append(result, tagsToOutbounds[key])
}
return result
}