| // Copyright Istio Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package v1alpha3 |
| |
| import ( |
| "fmt" |
| "math" |
| "strconv" |
| "strings" |
| ) |
| |
| import ( |
| cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" |
| core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" |
| endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" |
| discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
| xdstype "github.com/envoyproxy/go-control-plane/envoy/type/v3" |
| "google.golang.org/protobuf/types/known/durationpb" |
| "google.golang.org/protobuf/types/known/structpb" |
| wrappers "google.golang.org/protobuf/types/known/wrapperspb" |
| meshconfig "istio.io/api/mesh/v1alpha1" |
| networking "istio.io/api/networking/v1alpha3" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/features" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/model" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/core/v1alpha3/envoyfilter" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/core/v1alpha3/loadbalancer" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/telemetry" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/util" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/host" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/protocol" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk" |
| "github.com/apache/dubbo-go-pixiu/pkg/util/sets" |
| ) |
| |
| // deltaConfigTypes are used to detect changes and trigger delta calculations. When config updates has ONLY entries |
| // in this map, then delta calculation is triggered. |
| var deltaConfigTypes = sets.New(gvk.ServiceEntry.Kind) |
| |
| // getDefaultCircuitBreakerThresholds returns a copy of the default circuit breaker thresholds for the given traffic direction. |
| func getDefaultCircuitBreakerThresholds() *cluster.CircuitBreakers_Thresholds { |
| return &cluster.CircuitBreakers_Thresholds{ |
| // DefaultMaxRetries specifies the default for the Envoy circuit breaker parameter max_retries. This |
| // defines the maximum number of parallel retries a given Envoy will allow to the upstream cluster. Envoy defaults |
| // this value to 3, however that has shown to be insufficient during periods of pod churn (e.g. rolling updates), |
| // where multiple endpoints in a cluster are terminated. In these scenarios the circuit breaker can kick |
| // in before Pilot is able to deliver an updated endpoint list to Envoy, leading to client-facing 503s. |
| MaxRetries: &wrappers.UInt32Value{Value: math.MaxUint32}, |
| MaxRequests: &wrappers.UInt32Value{Value: math.MaxUint32}, |
| MaxConnections: &wrappers.UInt32Value{Value: math.MaxUint32}, |
| MaxPendingRequests: &wrappers.UInt32Value{Value: math.MaxUint32}, |
| TrackRemaining: true, |
| } |
| } |
| |
| // BuildClusters returns the list of clusters for the given proxy. This is the CDS output |
| // For outbound: Cluster for each service/subset hostname or cidr with SNI set to service hostname |
| // Cluster type based on resolution |
| // For inbound (sidecar only): Cluster for each inbound endpoint port and for each service port |
| func (configgen *ConfigGeneratorImpl) BuildClusters(proxy *model.Proxy, req *model.PushRequest) ([]*discovery.Resource, model.XdsLogDetails) { |
| // In Sotw, we care about all services. |
| var services []*model.Service |
| if features.FilterGatewayClusterConfig && proxy.Type == model.Router { |
| services = req.Push.GatewayServices(proxy) |
| } else { |
| services = proxy.SidecarScope.Services() |
| } |
| return configgen.buildClusters(proxy, req, services) |
| } |
| |
| // BuildDeltaClusters generates the deltas (add and delete) for a given proxy. Currently, only service changes are reflected with deltas. |
| // Otherwise, we fall back onto generating everything. |
| func (configgen *ConfigGeneratorImpl) BuildDeltaClusters(proxy *model.Proxy, updates *model.PushRequest, |
| watched *model.WatchedResource, |
| ) ([]*discovery.Resource, []string, model.XdsLogDetails, bool) { |
| // if we can't use delta, fall back to generate all |
| if !shouldUseDelta(updates) { |
| cl, lg := configgen.BuildClusters(proxy, updates) |
| return cl, nil, lg, false |
| } |
| |
| var deletedClusters []string |
| var services []*model.Service |
| // holds clusters per service, keyed by hostname. |
| serviceClusters := make(map[string]sets.Set) |
| // holds service ports, keyed by hostname. |
| // inner map holds port and its cluster name. |
| servicePorts := make(map[string]map[int]string) |
| |
| for _, cluster := range watched.ResourceNames { |
| // WatchedResources.ResourceNames will contain the names of the clusters it is subscribed to. We can |
| // check with the name of our service (cluster names are in the format outbound|<port>||<hostname>). |
| _, _, svcHost, port := model.ParseSubsetKey(cluster) |
| if serviceClusters[string(svcHost)] == nil { |
| serviceClusters[string(svcHost)] = sets.New() |
| } |
| serviceClusters[string(svcHost)].Insert(cluster) |
| if servicePorts[string(svcHost)] == nil { |
| servicePorts[string(svcHost)] = make(map[int]string) |
| } |
| servicePorts[string(svcHost)][port] = cluster |
| } |
| |
| // In delta, we only care about the services that have changed. |
| for key := range updates.ConfigsUpdated { |
| // get the service that has changed. |
| service := updates.Push.ServiceForHostname(proxy, host.Name(key.Name)) |
| // if this service removed, we can conclude that it is a removed cluster. |
| if service == nil { |
| for cluster := range serviceClusters[key.Name] { |
| deletedClusters = append(deletedClusters, cluster) |
| } |
| } else { |
| services = append(services, service) |
| // If servicePorts has this service, that means it is old service. |
| if servicePorts[service.Hostname.String()] != nil { |
| oldPorts := servicePorts[service.Hostname.String()] |
| for port, cluster := range oldPorts { |
| // if this service port is removed, we can conclude that it is a removed cluster. |
| if _, exists := service.Ports.GetByPort(port); !exists { |
| deletedClusters = append(deletedClusters, cluster) |
| } |
| } |
| } |
| } |
| } |
| clusters, log := configgen.buildClusters(proxy, updates, services) |
| return clusters, deletedClusters, log, true |
| } |
| |
| // buildClusters builds clusters for the proxy with the services passed. |
| func (configgen *ConfigGeneratorImpl) buildClusters(proxy *model.Proxy, req *model.PushRequest, |
| services []*model.Service, |
| ) ([]*discovery.Resource, model.XdsLogDetails) { |
| clusters := make([]*cluster.Cluster, 0) |
| resources := model.Resources{} |
| envoyFilterPatches := req.Push.EnvoyFilters(proxy) |
| cb := NewClusterBuilder(proxy, req, configgen.Cache) |
| instances := proxy.ServiceInstances |
| cacheStats := cacheStats{} |
| switch proxy.Type { |
| case model.SidecarProxy: |
| // Setup outbound clusters |
| outboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_OUTBOUND} |
| ob, cs := configgen.buildOutboundClusters(cb, proxy, outboundPatcher, services) |
| cacheStats = cacheStats.merge(cs) |
| resources = append(resources, ob...) |
| // Add a blackhole and passthrough cluster for catching traffic to unresolved routes |
| clusters = outboundPatcher.conditionallyAppend(clusters, nil, cb.buildBlackHoleCluster(), cb.buildDefaultPassthroughCluster()) |
| clusters = append(clusters, outboundPatcher.insertedClusters()...) |
| |
| // Setup inbound clusters |
| inboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_INBOUND} |
| clusters = append(clusters, configgen.buildInboundClusters(cb, proxy, instances, inboundPatcher)...) |
| // Pass through clusters for inbound traffic. These cluster bind loopback-ish src address to access node local service. |
| clusters = inboundPatcher.conditionallyAppend(clusters, nil, cb.buildInboundPassthroughClusters()...) |
| clusters = append(clusters, inboundPatcher.insertedClusters()...) |
| default: // Gateways |
| patcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_GATEWAY} |
| ob, cs := configgen.buildOutboundClusters(cb, proxy, patcher, services) |
| cacheStats = cacheStats.merge(cs) |
| resources = append(resources, ob...) |
| // Gateways do not require the default passthrough cluster as they do not have original dst listeners. |
| clusters = patcher.conditionallyAppend(clusters, nil, cb.buildBlackHoleCluster()) |
| if proxy.Type == model.Router && proxy.MergedGateway != nil && proxy.MergedGateway.ContainsAutoPassthroughGateways { |
| clusters = append(clusters, configgen.buildOutboundSniDnatClusters(proxy, req, patcher)...) |
| } |
| clusters = append(clusters, patcher.insertedClusters()...) |
| } |
| |
| for _, c := range clusters { |
| resources = append(resources, &discovery.Resource{Name: c.Name, Resource: util.MessageToAny(c)}) |
| } |
| resources = cb.normalizeClusters(resources) |
| |
| if cacheStats.empty() { |
| return resources, model.DefaultXdsLogDetails |
| } |
| return resources, model.XdsLogDetails{AdditionalInfo: fmt.Sprintf("cached:%v/%v", cacheStats.hits, cacheStats.hits+cacheStats.miss)} |
| } |
| |
| func shouldUseDelta(updates *model.PushRequest) bool { |
| return updates != nil && deltaAwareConfigTypes(updates.ConfigsUpdated) && len(updates.ConfigsUpdated) > 0 |
| } |
| |
| // deltaAwareConfigTypes returns true if all updated configs are delta enabled. |
| func deltaAwareConfigTypes(cfgs map[model.ConfigKey]struct{}) bool { |
| for k := range cfgs { |
| if !deltaConfigTypes.Contains(k.Kind.Kind) { |
| return false |
| } |
| } |
| return true |
| } |
| |
| type cacheStats struct { |
| hits, miss int |
| } |
| |
| func (c cacheStats) empty() bool { |
| return c.hits == 0 && c.miss == 0 |
| } |
| |
| func (c cacheStats) merge(other cacheStats) cacheStats { |
| return cacheStats{ |
| hits: c.hits + other.hits, |
| miss: c.miss + other.miss, |
| } |
| } |
| |
| func buildClusterKey(service *model.Service, port *model.Port, cb *ClusterBuilder, proxy *model.Proxy, efKeys []string) *clusterCache { |
| clusterName := model.BuildSubsetKey(model.TrafficDirectionOutbound, "", service.Hostname, port.Port) |
| clusterKey := &clusterCache{ |
| clusterName: clusterName, |
| proxyVersion: cb.proxyVersion, |
| locality: cb.locality, |
| proxyClusterID: cb.clusterID, |
| proxySidecar: cb.sidecarProxy(), |
| proxyView: cb.proxyView, |
| http2: port.Protocol.IsHTTP2(), |
| downstreamAuto: cb.sidecarProxy() && util.IsProtocolSniffingEnabledForOutboundPort(port), |
| supportsIPv4: cb.supportsIPv4, |
| service: service, |
| destinationRule: proxy.SidecarScope.DestinationRule(model.TrafficDirectionOutbound, proxy, service.Hostname), |
| envoyFilterKeys: efKeys, |
| metadataCerts: cb.metadataCerts, |
| peerAuthVersion: cb.req.Push.AuthnPolicies.GetVersion(), |
| serviceAccounts: cb.req.Push.ServiceAccounts[service.Hostname][port.Port], |
| } |
| return clusterKey |
| } |
| |
| // buildOutboundClusters generates all outbound (including subsets) clusters for a given proxy. |
| func (configgen *ConfigGeneratorImpl) buildOutboundClusters(cb *ClusterBuilder, proxy *model.Proxy, cp clusterPatcher, |
| services []*model.Service, |
| ) ([]*discovery.Resource, cacheStats) { |
| resources := make([]*discovery.Resource, 0) |
| efKeys := cp.efw.Keys() |
| hit, miss := 0, 0 |
| for _, service := range services { |
| for _, port := range service.Ports { |
| if port.Protocol == protocol.UDP { |
| continue |
| } |
| clusterKey := buildClusterKey(service, port, cb, proxy, efKeys) |
| cached, allFound := cb.getAllCachedSubsetClusters(*clusterKey) |
| if allFound && !features.EnableUnsafeAssertions { |
| hit += len(cached) |
| resources = append(resources, cached...) |
| continue |
| } |
| miss += len(cached) |
| |
| // We have a cache miss, so we will re-generate the cluster and later store it in the cache. |
| lbEndpoints := cb.buildLocalityLbEndpoints(clusterKey.proxyView, service, port.Port, nil) |
| |
| // create default cluster |
| discoveryType := convertResolution(cb.proxyType, service) |
| defaultCluster := cb.buildDefaultCluster(clusterKey.clusterName, discoveryType, lbEndpoints, model.TrafficDirectionOutbound, port, service, nil) |
| if defaultCluster == nil { |
| continue |
| } |
| // If stat name is configured, build the alternate stats name. |
| if len(cb.req.Push.Mesh.OutboundClusterStatName) != 0 { |
| defaultCluster.cluster.AltStatName = telemetry.BuildStatPrefix(cb.req.Push.Mesh.OutboundClusterStatName, |
| string(service.Hostname), "", port, &service.Attributes) |
| } |
| |
| subsetClusters := cb.applyDestinationRule(defaultCluster, DefaultClusterMode, service, port, |
| clusterKey.proxyView, clusterKey.destinationRule, clusterKey.serviceAccounts) |
| |
| if patched := cp.applyResource(nil, defaultCluster.build()); patched != nil { |
| resources = append(resources, patched) |
| if features.EnableCDSCaching { |
| cb.cache.Add(clusterKey, cb.req, patched) |
| } |
| } |
| for _, ss := range subsetClusters { |
| if patched := cp.applyResource(nil, ss); patched != nil { |
| resources = append(resources, patched) |
| if features.EnableCDSCaching { |
| nk := *clusterKey |
| nk.clusterName = ss.Name |
| cb.cache.Add(&nk, cb.req, patched) |
| } |
| } |
| } |
| } |
| } |
| |
| return resources, cacheStats{hits: hit, miss: miss} |
| } |
| |
| type clusterPatcher struct { |
| efw *model.EnvoyFilterWrapper |
| pctx networking.EnvoyFilter_PatchContext |
| } |
| |
| func (p clusterPatcher) applyResource(hosts []host.Name, c *cluster.Cluster) *discovery.Resource { |
| cluster := p.apply(hosts, c) |
| if cluster == nil { |
| return nil |
| } |
| return &discovery.Resource{Name: cluster.Name, Resource: util.MessageToAny(cluster)} |
| } |
| |
| func (p clusterPatcher) apply(hosts []host.Name, c *cluster.Cluster) *cluster.Cluster { |
| if !envoyfilter.ShouldKeepCluster(p.pctx, p.efw, c, hosts) { |
| return nil |
| } |
| return envoyfilter.ApplyClusterMerge(p.pctx, p.efw, c, hosts) |
| } |
| |
| func (p clusterPatcher) conditionallyAppend(l []*cluster.Cluster, hosts []host.Name, clusters ...*cluster.Cluster) []*cluster.Cluster { |
| if !p.hasPatches() { |
| return append(l, clusters...) |
| } |
| for _, c := range clusters { |
| if patched := p.apply(hosts, c); patched != nil { |
| l = append(l, patched) |
| } |
| } |
| return l |
| } |
| |
| func (p clusterPatcher) insertedClusters() []*cluster.Cluster { |
| return envoyfilter.InsertedClusters(p.pctx, p.efw) |
| } |
| |
| func (p clusterPatcher) hasPatches() bool { |
| return p.efw != nil && len(p.efw.Patches[networking.EnvoyFilter_CLUSTER]) > 0 |
| } |
| |
| // SniDnat clusters do not have any TLS setting, as they simply forward traffic to upstream |
| // All SniDnat clusters are internal services in the mesh. |
| // TODO enable cache - there is no blockers here, skipped to simplify the original caching implementation |
| func (configgen *ConfigGeneratorImpl) buildOutboundSniDnatClusters(proxy *model.Proxy, req *model.PushRequest, |
| cp clusterPatcher, |
| ) []*cluster.Cluster { |
| clusters := make([]*cluster.Cluster, 0) |
| cb := NewClusterBuilder(proxy, req, nil) |
| |
| proxyView := proxy.GetView() |
| |
| for _, service := range proxy.SidecarScope.Services() { |
| if service.MeshExternal { |
| continue |
| } |
| |
| destRule := proxy.SidecarScope.DestinationRule(model.TrafficDirectionOutbound, proxy, service.Hostname) |
| for _, port := range service.Ports { |
| if port.Protocol == protocol.UDP { |
| continue |
| } |
| lbEndpoints := cb.buildLocalityLbEndpoints(proxyView, service, port.Port, nil) |
| |
| // create default cluster |
| discoveryType := convertResolution(cb.proxyType, service) |
| |
| clusterName := model.BuildDNSSrvSubsetKey(model.TrafficDirectionOutbound, "", |
| service.Hostname, port.Port) |
| defaultCluster := cb.buildDefaultCluster(clusterName, discoveryType, lbEndpoints, model.TrafficDirectionOutbound, port, service, nil) |
| if defaultCluster == nil { |
| continue |
| } |
| subsetClusters := cb.applyDestinationRule(defaultCluster, SniDnatClusterMode, service, port, proxyView, destRule, nil) |
| clusters = cp.conditionallyAppend(clusters, nil, defaultCluster.build()) |
| clusters = cp.conditionallyAppend(clusters, nil, subsetClusters...) |
| } |
| } |
| |
| return clusters |
| } |
| |
| func buildInboundLocalityLbEndpoints(bind string, port uint32) []*endpoint.LocalityLbEndpoints { |
| if bind == "" { |
| return nil |
| } |
| address := util.BuildAddress(bind, port) |
| lbEndpoint := &endpoint.LbEndpoint{ |
| HostIdentifier: &endpoint.LbEndpoint_Endpoint{ |
| Endpoint: &endpoint.Endpoint{ |
| Address: address, |
| }, |
| }, |
| } |
| return []*endpoint.LocalityLbEndpoints{ |
| { |
| LbEndpoints: []*endpoint.LbEndpoint{lbEndpoint}, |
| }, |
| } |
| } |
| |
| func (configgen *ConfigGeneratorImpl) buildInboundClusters(cb *ClusterBuilder, proxy *model.Proxy, instances []*model.ServiceInstance, |
| cp clusterPatcher, |
| ) []*cluster.Cluster { |
| clusters := make([]*cluster.Cluster, 0) |
| |
| // The inbound clusters for a node depends on whether the node has a SidecarScope with inbound listeners |
| // or not. If the node has a sidecarscope with ingress listeners, we only return clusters corresponding |
| // to those listeners i.e. clusters made out of the defaultEndpoint field. |
| // If the node has no sidecarScope and has interception mode set to NONE, then we should skip the inbound |
| // clusters, because there would be no corresponding inbound listeners |
| sidecarScope := proxy.SidecarScope |
| noneMode := proxy.GetInterceptionMode() == model.InterceptionNone |
| |
| _, actualLocalHost := getActualWildcardAndLocalHost(proxy) |
| |
| // No user supplied sidecar scope or the user supplied one has no ingress listeners |
| if !sidecarScope.HasIngressListener() { |
| // We should not create inbound listeners in NONE mode based on the service instances |
| // Doing so will prevent the workloads from starting as they would be listening on the same port |
| // Users are required to provide the sidecar config to define the inbound listeners |
| if noneMode { |
| return nil |
| } |
| |
| clustersToBuild := make(map[int][]*model.ServiceInstance) |
| for _, instance := range instances { |
| // For service instances with the same port, |
| // we still need to capture all the instances on this port, as its required to populate telemetry metadata |
| // The first instance will be used as the "primary" instance; this means if we have an conflicts between |
| // Services the first one wins |
| ep := int(instance.Endpoint.EndpointPort) |
| clustersToBuild[ep] = append(clustersToBuild[ep], instance) |
| } |
| |
| bind := actualLocalHost |
| if features.EnableInboundPassthrough { |
| bind = "" |
| } |
| // For each workload port, we will construct a cluster |
| for epPort, instances := range clustersToBuild { |
| // The inbound cluster port equals to endpoint port. |
| localCluster := cb.buildInboundClusterForPortOrUDS(epPort, bind, proxy, instances[0], instances) |
| // If inbound cluster match has service, we should see if it matches with any host name across all instances. |
| hosts := make([]host.Name, 0, len(instances)) |
| for _, si := range instances { |
| hosts = append(hosts, si.Service.Hostname) |
| } |
| clusters = cp.conditionallyAppend(clusters, hosts, localCluster.build()) |
| } |
| return clusters |
| } |
| |
| for _, ingressListener := range sidecarScope.Sidecar.Ingress { |
| // LDS would have setup the inbound clusters |
| // as inbound|portNumber|portName|Hostname[or]SidecarScopeID |
| listenPort := &model.Port{ |
| Port: int(ingressListener.Port.Number), |
| Protocol: protocol.Parse(ingressListener.Port.Protocol), |
| Name: ingressListener.Port.Name, |
| } |
| |
| // Set up the endpoint. By default, we set this empty which will use ORIGINAL_DST passthrough. |
| // This can be overridden by ingress.defaultEndpoint. |
| // * 127.0.0.1: send to localhost |
| // * 0.0.0.0: send to INSTANCE_IP |
| // * unix:///...: send to configured unix domain socket |
| endpointAddress := "" |
| port := 0 |
| if strings.HasPrefix(ingressListener.DefaultEndpoint, model.UnixAddressPrefix) { |
| // this is a UDS endpoint. assign it as is |
| endpointAddress = ingressListener.DefaultEndpoint |
| } else if len(ingressListener.DefaultEndpoint) > 0 { |
| // parse the ip, port. Validation guarantees presence of : |
| parts := strings.Split(ingressListener.DefaultEndpoint, ":") |
| if len(parts) < 2 { |
| continue |
| } |
| var err error |
| if port, err = strconv.Atoi(parts[1]); err != nil { |
| continue |
| } |
| if parts[0] == model.PodIPAddressPrefix { |
| endpointAddress = cb.proxyIPAddresses[0] |
| } else if parts[0] == model.LocalhostAddressPrefix { |
| endpointAddress = actualLocalHost |
| } |
| } |
| |
| // Find the service instance that corresponds to this ingress listener by looking |
| // for a service instance that matches this ingress port as this will allow us |
| // to generate the right cluster name that LDS expects inbound|portNumber|portName|Hostname |
| instance := findOrCreateServiceInstance(instances, ingressListener, sidecarScope.Name, sidecarScope.Namespace) |
| instance.Endpoint.Address = endpointAddress |
| instance.ServicePort = listenPort |
| instance.Endpoint.ServicePortName = listenPort.Name |
| instance.Endpoint.EndpointPort = uint32(port) |
| |
| localCluster := cb.buildInboundClusterForPortOrUDS(int(ingressListener.Port.Number), endpointAddress, proxy, instance, nil) |
| clusters = cp.conditionallyAppend(clusters, []host.Name{instance.Service.Hostname}, localCluster.build()) |
| } |
| |
| return clusters |
| } |
| |
| func findOrCreateServiceInstance(instances []*model.ServiceInstance, |
| ingressListener *networking.IstioIngressListener, sidecar string, sidecarns string, |
| ) *model.ServiceInstance { |
| for _, realInstance := range instances { |
| if realInstance.Endpoint.EndpointPort == ingressListener.Port.Number { |
| // We need to create a copy of the instance, as it is modified later while building clusters/listeners. |
| return realInstance.DeepCopy() |
| } |
| } |
| // We didn't find a matching instance. Create a dummy one because we need the right |
| // params to generate the right cluster name i.e. inbound|portNumber|portName|SidecarScopeID - which is uniformly generated by LDS/CDS. |
| return &model.ServiceInstance{ |
| Service: &model.Service{ |
| Hostname: host.Name(sidecar + "." + sidecarns), |
| Attributes: model.ServiceAttributes{ |
| Name: sidecar, |
| // This will ensure that the right AuthN policies are selected |
| Namespace: sidecarns, |
| }, |
| }, |
| Endpoint: &model.IstioEndpoint{ |
| EndpointPort: ingressListener.Port.Number, |
| }, |
| } |
| } |
| |
| func convertResolution(proxyType model.NodeType, service *model.Service) cluster.Cluster_DiscoveryType { |
| switch service.Resolution { |
| case model.ClientSideLB: |
| return cluster.Cluster_EDS |
| case model.DNSLB: |
| return cluster.Cluster_STRICT_DNS |
| case model.DNSRoundRobinLB: |
| return cluster.Cluster_LOGICAL_DNS |
| case model.Passthrough: |
| // Gateways cannot use passthrough clusters. So fallback to EDS |
| if proxyType == model.SidecarProxy { |
| if service.Attributes.ServiceRegistry == provider.Kubernetes && features.EnableEDSForHeadless { |
| return cluster.Cluster_EDS |
| } |
| |
| return cluster.Cluster_ORIGINAL_DST |
| } |
| return cluster.Cluster_EDS |
| default: |
| return cluster.Cluster_EDS |
| } |
| } |
| |
| // SelectTrafficPolicyComponents returns the components of TrafficPolicy that should be used for given port. |
| func selectTrafficPolicyComponents(policy *networking.TrafficPolicy) ( |
| *networking.ConnectionPoolSettings, *networking.OutlierDetection, *networking.LoadBalancerSettings, *networking.ClientTLSSettings, |
| ) { |
| if policy == nil { |
| return nil, nil, nil, nil |
| } |
| connectionPool := policy.ConnectionPool |
| outlierDetection := policy.OutlierDetection |
| loadBalancer := policy.LoadBalancer |
| tls := policy.Tls |
| |
| // Check if CA Certificate should be System CA Certificate |
| if features.VerifyCertAtClient && tls != nil && tls.CaCertificates == "" { |
| tls.CaCertificates = "system" |
| } |
| |
| return connectionPool, outlierDetection, loadBalancer, tls |
| } |
| |
| // ClusterMode defines whether the cluster is being built for SNI-DNATing (sni passthrough) or not |
| type ClusterMode string |
| |
| const ( |
| // SniDnatClusterMode indicates cluster is being built for SNI dnat mode |
| SniDnatClusterMode ClusterMode = "sni-dnat" |
| // DefaultClusterMode indicates usual cluster with mTLS et al |
| DefaultClusterMode ClusterMode = "outbound" |
| ) |
| |
| type buildClusterOpts struct { |
| mesh *meshconfig.MeshConfig |
| mutable *MutableCluster |
| policy *networking.TrafficPolicy |
| port *model.Port |
| serviceAccounts []string |
| serviceInstances []*model.ServiceInstance |
| // Used for traffic across multiple network clusters |
| // the east-west gateway in a remote cluster will use this value to route |
| // traffic to the appropriate service |
| istioMtlsSni string |
| clusterMode ClusterMode |
| direction model.TrafficDirection |
| meshExternal bool |
| serviceMTLSMode model.MutualTLSMode |
| // Indicates the service registry of the cluster being built. |
| serviceRegistry provider.ID |
| // Indicates if the destionationRule has a workloadSelector |
| isDrWithSelector bool |
| } |
| |
| type upgradeTuple struct { |
| meshdefault meshconfig.MeshConfig_H2UpgradePolicy |
| override networking.ConnectionPoolSettings_HTTPSettings_H2UpgradePolicy |
| } |
| |
| func applyTCPKeepalive(mesh *meshconfig.MeshConfig, c *cluster.Cluster, tcp *networking.ConnectionPoolSettings_TCPSettings) { |
| // Apply mesh wide TCP keepalive if available. |
| setKeepAliveSettings(c, mesh.TcpKeepalive) |
| |
| // Apply/Override individual attributes with DestinationRule TCP keepalive if set. |
| if tcp != nil { |
| setKeepAliveSettings(c, tcp.TcpKeepalive) |
| } |
| } |
| |
| func setKeepAliveSettings(c *cluster.Cluster, keepalive *networking.ConnectionPoolSettings_TCPSettings_TcpKeepalive) { |
| if keepalive == nil { |
| return |
| } |
| // Start with empty tcp_keepalive, which would set SO_KEEPALIVE on the socket with OS default values. |
| if c.UpstreamConnectionOptions == nil { |
| c.UpstreamConnectionOptions = &cluster.UpstreamConnectionOptions{ |
| TcpKeepalive: &core.TcpKeepalive{}, |
| } |
| } |
| if keepalive.Probes > 0 { |
| c.UpstreamConnectionOptions.TcpKeepalive.KeepaliveProbes = &wrappers.UInt32Value{Value: keepalive.Probes} |
| } |
| |
| if keepalive.Time != nil { |
| c.UpstreamConnectionOptions.TcpKeepalive.KeepaliveTime = &wrappers.UInt32Value{Value: uint32(keepalive.Time.Seconds)} |
| } |
| |
| if keepalive.Interval != nil { |
| c.UpstreamConnectionOptions.TcpKeepalive.KeepaliveInterval = &wrappers.UInt32Value{Value: uint32(keepalive.Interval.Seconds)} |
| } |
| } |
| |
| // FIXME: there isn't a way to distinguish between unset values and zero values |
| func applyOutlierDetection(c *cluster.Cluster, outlier *networking.OutlierDetection) { |
| if outlier == nil { |
| return |
| } |
| |
| out := &cluster.OutlierDetection{} |
| |
| // SuccessRate based outlier detection should be disabled. |
| out.EnforcingSuccessRate = &wrappers.UInt32Value{Value: 0} |
| |
| if e := outlier.Consecutive_5XxErrors; e != nil { |
| v := e.GetValue() |
| |
| out.Consecutive_5Xx = &wrappers.UInt32Value{Value: v} |
| |
| if v > 0 { |
| v = 100 |
| } |
| out.EnforcingConsecutive_5Xx = &wrappers.UInt32Value{Value: v} |
| } |
| if e := outlier.ConsecutiveGatewayErrors; e != nil { |
| v := e.GetValue() |
| |
| out.ConsecutiveGatewayFailure = &wrappers.UInt32Value{Value: v} |
| |
| if v > 0 { |
| v = 100 |
| } |
| out.EnforcingConsecutiveGatewayFailure = &wrappers.UInt32Value{Value: v} |
| } |
| |
| if outlier.Interval != nil { |
| out.Interval = outlier.Interval |
| } |
| if outlier.BaseEjectionTime != nil { |
| out.BaseEjectionTime = outlier.BaseEjectionTime |
| } |
| if outlier.MaxEjectionPercent > 0 { |
| out.MaxEjectionPercent = &wrappers.UInt32Value{Value: uint32(outlier.MaxEjectionPercent)} |
| } |
| |
| if outlier.SplitExternalLocalOriginErrors { |
| out.SplitExternalLocalOriginErrors = true |
| if outlier.ConsecutiveLocalOriginFailures.GetValue() > 0 { |
| out.ConsecutiveLocalOriginFailure = &wrappers.UInt32Value{Value: outlier.ConsecutiveLocalOriginFailures.Value} |
| out.EnforcingConsecutiveLocalOriginFailure = &wrappers.UInt32Value{Value: 100} |
| } |
| // SuccessRate based outlier detection should be disabled. |
| out.EnforcingLocalOriginSuccessRate = &wrappers.UInt32Value{Value: 0} |
| } |
| |
| c.OutlierDetection = out |
| |
| // Disable panic threshold by default as its not typically applicable in k8s environments |
| // with few pods per service. |
| // To do so, set the healthy_panic_threshold field even if its value is 0 (defaults to 50). |
| // FIXME: we can't distinguish between it being unset or being explicitly set to 0 |
| minHealthPercent := outlier.MinHealthPercent |
| if minHealthPercent >= 0 { |
| if c.CommonLbConfig == nil { |
| c.CommonLbConfig = &cluster.Cluster_CommonLbConfig{} |
| } |
| // When we are sending unhealthy endpoints, we should disble Panic Threshold. Otherwise |
| // Envoy will send traffic to "Unready" pods when the percentage of healthy hosts fall |
| // below minimum health percentage. |
| if features.SendUnhealthyEndpoints { |
| minHealthPercent = 0 |
| } |
| c.CommonLbConfig.HealthyPanicThreshold = &xdstype.Percent{Value: float64(minHealthPercent)} |
| } |
| } |
| |
| func defaultLBAlgorithm() cluster.Cluster_LbPolicy { |
| if features.EnableLegacyLBAlgorithmDefault { |
| return cluster.Cluster_ROUND_ROBIN |
| } |
| return cluster.Cluster_LEAST_REQUEST |
| } |
| |
| func applyLoadBalancer(c *cluster.Cluster, lb *networking.LoadBalancerSettings, port *model.Port, |
| locality *core.Locality, proxyLabels map[string]string, meshConfig *meshconfig.MeshConfig, |
| ) { |
| // Disable panic threshold when SendUnhealthyEndpoints is enabled as enabling it "may" send traffic to unready |
| // end points when load balancer is in panic mode. |
| if features.SendUnhealthyEndpoints { |
| if c.CommonLbConfig == nil { |
| c.CommonLbConfig = &cluster.Cluster_CommonLbConfig{} |
| } |
| c.CommonLbConfig.HealthyPanicThreshold = &xdstype.Percent{Value: 0} |
| } |
| localityLbSetting := loadbalancer.GetLocalityLbSetting(meshConfig.GetLocalityLbSetting(), lb.GetLocalityLbSetting()) |
| if localityLbSetting != nil { |
| if c.CommonLbConfig == nil { |
| c.CommonLbConfig = &cluster.Cluster_CommonLbConfig{} |
| } |
| c.CommonLbConfig.LocalityConfigSpecifier = &cluster.Cluster_CommonLbConfig_LocalityWeightedLbConfig_{ |
| LocalityWeightedLbConfig: &cluster.Cluster_CommonLbConfig_LocalityWeightedLbConfig{}, |
| } |
| } |
| // Use locality lb settings from load balancer settings if present, else use mesh wide locality lb settings |
| applyLocalityLBSetting(locality, proxyLabels, c, localityLbSetting) |
| |
| if c.GetType() == cluster.Cluster_ORIGINAL_DST { |
| c.LbPolicy = cluster.Cluster_CLUSTER_PROVIDED |
| return |
| } |
| |
| // Redis protocol must be defaulted with MAGLEV to benefit from client side sharding. |
| if features.EnableRedisFilter && port != nil && port.Protocol == protocol.Redis { |
| c.LbPolicy = cluster.Cluster_MAGLEV |
| return |
| } |
| |
| // DO not do if else here. since lb.GetSimple returns a enum value (not pointer). |
| switch lb.GetSimple() { |
| // nolint: staticcheck |
| case networking.LoadBalancerSettings_LEAST_CONN, networking.LoadBalancerSettings_LEAST_REQUEST: |
| applyLeastRequestLoadBalancer(c, lb) |
| case networking.LoadBalancerSettings_RANDOM: |
| c.LbPolicy = cluster.Cluster_RANDOM |
| case networking.LoadBalancerSettings_ROUND_ROBIN: |
| applyRoundRobinLoadBalancer(c, lb) |
| case networking.LoadBalancerSettings_PASSTHROUGH: |
| c.LbPolicy = cluster.Cluster_CLUSTER_PROVIDED |
| c.ClusterDiscoveryType = &cluster.Cluster_Type{Type: cluster.Cluster_ORIGINAL_DST} |
| default: |
| applySimpleDefaultLoadBalancer(c, lb) |
| } |
| |
| ApplyRingHashLoadBalancer(c, lb) |
| } |
| |
| // applySimpleDefaultLoadBalancer will set the DefaultLBPolicy and create an LbConfig if used in LoadBalancerSettings |
| func applySimpleDefaultLoadBalancer(c *cluster.Cluster, loadbalancer *networking.LoadBalancerSettings) { |
| c.LbPolicy = defaultLBAlgorithm() |
| switch c.LbPolicy { |
| case cluster.Cluster_ROUND_ROBIN: |
| applyRoundRobinLoadBalancer(c, loadbalancer) |
| case cluster.Cluster_LEAST_REQUEST: |
| applyLeastRequestLoadBalancer(c, loadbalancer) |
| } |
| } |
| |
| // applyRoundRobinLoadBalancer will set the LbPolicy and create an LbConfig for ROUND_ROBIN if used in LoadBalancerSettings |
| func applyRoundRobinLoadBalancer(c *cluster.Cluster, loadbalancer *networking.LoadBalancerSettings) { |
| c.LbPolicy = cluster.Cluster_ROUND_ROBIN |
| |
| if loadbalancer.GetWarmupDurationSecs() != nil { |
| c.LbConfig = &cluster.Cluster_RoundRobinLbConfig_{ |
| RoundRobinLbConfig: &cluster.Cluster_RoundRobinLbConfig{ |
| SlowStartConfig: setSlowStartConfig(loadbalancer.GetWarmupDurationSecs()), |
| }, |
| } |
| } |
| } |
| |
| // applyLeastRequestLoadBalancer will set the LbPolicy and create an LbConfig for LEAST_REQUEST if used in LoadBalancerSettings |
| func applyLeastRequestLoadBalancer(c *cluster.Cluster, loadbalancer *networking.LoadBalancerSettings) { |
| c.LbPolicy = cluster.Cluster_LEAST_REQUEST |
| |
| if loadbalancer.GetWarmupDurationSecs() != nil { |
| c.LbConfig = &cluster.Cluster_LeastRequestLbConfig_{ |
| LeastRequestLbConfig: &cluster.Cluster_LeastRequestLbConfig{ |
| SlowStartConfig: setSlowStartConfig(loadbalancer.GetWarmupDurationSecs()), |
| }, |
| } |
| } |
| } |
| |
| // setSlowStartConfig will set the warmupDurationSecs for LEAST_REQUEST and ROUND_ROBIN if provided in DestinationRule |
| func setSlowStartConfig(dur *durationpb.Duration) *cluster.Cluster_SlowStartConfig { |
| return &cluster.Cluster_SlowStartConfig{ |
| SlowStartWindow: dur, |
| } |
| } |
| |
| // ApplyRingHashLoadBalancer will set the LbPolicy and create an LbConfig for RING_HASH if used in LoadBalancerSettings |
| func ApplyRingHashLoadBalancer(c *cluster.Cluster, lb *networking.LoadBalancerSettings) { |
| consistentHash := lb.GetConsistentHash() |
| if consistentHash == nil { |
| return |
| } |
| |
| // TODO MinimumRingSize is an int, and zero could potentially be a valid value |
| // unable to distinguish between set and unset case currently GregHanson |
| // 1024 is the default value for envoy |
| minRingSize := &wrappers.UInt64Value{Value: 1024} |
| if consistentHash.MinimumRingSize != 0 { |
| minRingSize = &wrappers.UInt64Value{Value: consistentHash.GetMinimumRingSize()} |
| } |
| c.LbPolicy = cluster.Cluster_RING_HASH |
| c.LbConfig = &cluster.Cluster_RingHashLbConfig_{ |
| RingHashLbConfig: &cluster.Cluster_RingHashLbConfig{ |
| MinimumRingSize: minRingSize, |
| }, |
| } |
| } |
| |
| func applyLocalityLBSetting(locality *core.Locality, proxyLabels map[string]string, cluster *cluster.Cluster, |
| localityLB *networking.LocalityLoadBalancerSetting, |
| ) { |
| // Failover should only be applied with outlier detection, or traffic will never failover. |
| enabledFailover := cluster.OutlierDetection != nil |
| if cluster.LoadAssignment != nil { |
| // TODO: enable failoverPriority for `STRICT_DNS` cluster type |
| loadbalancer.ApplyLocalityLBSetting(cluster.LoadAssignment, nil, locality, proxyLabels, localityLB, enabledFailover) |
| } |
| } |
| |
| func addTelemetryMetadata(opts buildClusterOpts, service *model.Service, direction model.TrafficDirection, instances []*model.ServiceInstance) { |
| if !features.EnableTelemetryLabel { |
| return |
| } |
| if opts.mutable.cluster == nil { |
| return |
| } |
| if direction == model.TrafficDirectionInbound && (opts.serviceInstances == nil || |
| len(opts.serviceInstances) == 0 || opts.port == nil) { |
| // At inbound, port and local service instance has to be provided |
| return |
| } |
| if direction == model.TrafficDirectionOutbound && service == nil { |
| // At outbound, the service corresponding to the cluster has to be provided. |
| return |
| } |
| |
| im := getOrCreateIstioMetadata(opts.mutable.cluster) |
| |
| // Add services field into istio metadata |
| im.Fields["services"] = &structpb.Value{ |
| Kind: &structpb.Value_ListValue{ |
| ListValue: &structpb.ListValue{ |
| Values: []*structpb.Value{}, |
| }, |
| }, |
| } |
| |
| svcMetaList := im.Fields["services"].GetListValue() |
| |
| // Add service related metadata. This will be consumed by telemetry v2 filter for metric labels. |
| if direction == model.TrafficDirectionInbound { |
| // For inbound cluster, add all services on the cluster port |
| have := make(map[host.Name]bool) |
| for _, svc := range instances { |
| if svc.ServicePort.Port != opts.port.Port { |
| // If the service port is different from the the port of the cluster that is being built, |
| // skip adding telemetry metadata for the service to the cluster. |
| continue |
| } |
| if _, ok := have[svc.Service.Hostname]; ok { |
| // Skip adding metadata for instance with the same host name. |
| // This could happen when a service has multiple IPs. |
| continue |
| } |
| svcMetaList.Values = append(svcMetaList.Values, buildServiceMetadata(svc.Service)) |
| have[svc.Service.Hostname] = true |
| } |
| } else if direction == model.TrafficDirectionOutbound { |
| // For outbound cluster, add telemetry metadata based on the service that the cluster is built for. |
| svcMetaList.Values = append(svcMetaList.Values, buildServiceMetadata(service)) |
| } |
| } |
| |
| // Insert the original port into the istio metadata. The port is used in BTS delivered from client sidecar to server sidecar. |
| // Server side car uses this port after de-multiplexed from tunnel. |
| func addNetworkingMetadata(opts buildClusterOpts, service *model.Service, direction model.TrafficDirection) { |
| if opts.mutable == nil || direction == model.TrafficDirectionInbound { |
| return |
| } |
| if service == nil { |
| // At outbound, the service corresponding to the cluster has to be provided. |
| return |
| } |
| |
| if port, ok := service.Ports.GetByPort(opts.port.Port); ok { |
| im := getOrCreateIstioMetadata(opts.mutable.cluster) |
| |
| // Add original_port field into istio metadata |
| // Endpoint could override this port but the chance should be small. |
| im.Fields["default_original_port"] = &structpb.Value{ |
| Kind: &structpb.Value_NumberValue{ |
| NumberValue: float64(port.Port), |
| }, |
| } |
| } |
| } |
| |
| // Build a struct which contains service metadata and will be added into cluster label. |
| func buildServiceMetadata(svc *model.Service) *structpb.Value { |
| return &structpb.Value{ |
| Kind: &structpb.Value_StructValue{ |
| StructValue: &structpb.Struct{ |
| Fields: map[string]*structpb.Value{ |
| // service fqdn |
| "host": { |
| Kind: &structpb.Value_StringValue{ |
| StringValue: string(svc.Hostname), |
| }, |
| }, |
| // short name of the service |
| "name": { |
| Kind: &structpb.Value_StringValue{ |
| StringValue: svc.Attributes.Name, |
| }, |
| }, |
| // namespace of the service |
| "namespace": { |
| Kind: &structpb.Value_StringValue{ |
| StringValue: svc.Attributes.Namespace, |
| }, |
| }, |
| }, |
| }, |
| }, |
| } |
| } |
| |
| func getOrCreateIstioMetadata(cluster *cluster.Cluster) *structpb.Struct { |
| if cluster.Metadata == nil { |
| cluster.Metadata = &core.Metadata{ |
| FilterMetadata: map[string]*structpb.Struct{}, |
| } |
| } |
| // Create Istio metadata if does not exist yet |
| if _, ok := cluster.Metadata.FilterMetadata[util.IstioMetadataKey]; !ok { |
| cluster.Metadata.FilterMetadata[util.IstioMetadataKey] = &structpb.Struct{ |
| Fields: map[string]*structpb.Value{}, |
| } |
| } |
| return cluster.Metadata.FilterMetadata[util.IstioMetadataKey] |
| } |