blob: 58c1e17ed02206ce024fbec150e84f251509e2cd [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xds
import (
"fmt"
)
import (
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
any "google.golang.org/protobuf/types/known/anypb"
networkingapi "istio.io/api/networking/v1alpha3"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
networking "github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/core/v1alpha3"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/core/v1alpha3/loadbalancer"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/util"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/config/protocol"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
// PushType is an enumeration that decides what type push we should do when we get EDS update.
type PushType int
const (
// NoPush does not push any thing.
NoPush PushType = iota
// IncrementalPush just pushes endpoints.
IncrementalPush
// FullPush triggers full push - typically used for new services.
FullPush
)
// UpdateServiceShards will list the endpoints and create the shards.
// This is used to reconcile and to support non-k8s registries (until they migrate).
// Note that aggregated list is expensive (for large numbers) - we want to replace
// it with a model where DiscoveryServer keeps track of all endpoint registries
// directly, and calls them one by one.
func (s *DiscoveryServer) UpdateServiceShards(push *model.PushContext) error {
registries := s.getNonK8sRegistries()
// Short circuit now to avoid the call to Services
if len(registries) == 0 {
return nil
}
// Each registry acts as a shard - we don't want to combine them because some
// may individually update their endpoints incrementally
for _, svc := range push.GetAllServices() {
for _, registry := range registries {
// skip the service in case this svc does not belong to the registry.
if svc.Attributes.ServiceRegistry != registry.Provider() {
continue
}
endpoints := make([]*model.IstioEndpoint, 0)
for _, port := range svc.Ports {
if port.Protocol == protocol.UDP {
continue
}
// This loses track of grouping (shards)
for _, inst := range registry.InstancesByPort(svc, port.Port, nil) {
endpoints = append(endpoints, inst.Endpoint)
}
}
shard := model.ShardKeyFromRegistry(registry)
s.edsCacheUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, endpoints)
}
}
return nil
}
// SvcUpdate is a callback from service discovery when service info changes.
func (s *DiscoveryServer) SvcUpdate(shard model.ShardKey, hostname string, namespace string, event model.Event) {
// When a service deleted, we should cleanup the endpoint shards and also remove keys from EndpointIndex to
// prevent memory leaks.
if event == model.EventDelete {
inboundServiceDeletes.Increment()
s.EndpointIndex.DeleteServiceShard(shard, hostname, namespace, false)
} else {
inboundServiceUpdates.Increment()
}
}
// EDSUpdate computes destination address membership across all clusters and networks.
// This is the main method implementing EDS.
// It replaces InstancesByPort in model - instead of iterating over all endpoints it uses
// the hostname-keyed map. And it avoids the conversion from Endpoint to ServiceEntry to envoy
// on each step: instead the conversion happens once, when an endpoint is first discovered.
func (s *DiscoveryServer) EDSUpdate(shard model.ShardKey, serviceName string, namespace string,
istioEndpoints []*model.IstioEndpoint) {
inboundEDSUpdates.Increment()
// Update the endpoint shards
pushType := s.edsCacheUpdate(shard, serviceName, namespace, istioEndpoints)
if pushType == IncrementalPush || pushType == FullPush {
// Trigger a push
s.ConfigUpdate(&model.PushRequest{
Full: pushType == FullPush,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: gvk.ServiceEntry,
Name: serviceName,
Namespace: namespace,
}: {}},
Reason: []model.TriggerReason{model.EndpointUpdate},
})
}
}
// EDSCacheUpdate computes destination address membership across all clusters and networks.
// This is the main method implementing EDS.
// It replaces InstancesByPort in model - instead of iterating over all endpoints it uses
// the hostname-keyed map. And it avoids the conversion from Endpoint to ServiceEntry to envoy
// on each step: instead the conversion happens once, when an endpoint is first discovered.
//
// Note: the difference with `EDSUpdate` is that it only update the cache rather than requesting a push
func (s *DiscoveryServer) EDSCacheUpdate(shard model.ShardKey, serviceName string, namespace string,
istioEndpoints []*model.IstioEndpoint) {
inboundEDSUpdates.Increment()
// Update the endpoint shards
s.edsCacheUpdate(shard, serviceName, namespace, istioEndpoints)
}
// edsCacheUpdate updates EndpointShards data by clusterID, hostname, IstioEndpoints.
// It also tracks the changes to ServiceAccounts. It returns whether endpoints need to be pushed and
// it also returns if they need to be pushed whether a full push is needed or incremental push is sufficient.
func (s *DiscoveryServer) edsCacheUpdate(shard model.ShardKey, hostname string, namespace string,
istioEndpoints []*model.IstioEndpoint) PushType {
if len(istioEndpoints) == 0 {
// Should delete the service EndpointShards when endpoints become zero to prevent memory leak,
// but we should not delete the keys from EndpointIndex map - that will trigger
// unnecessary full push which can become a real problem if a pod is in crashloop and thus endpoints
// flip flopping between 1 and 0.
s.EndpointIndex.DeleteServiceShard(shard, hostname, namespace, true)
log.Infof("Incremental push, service %s at shard %v has no endpoints", hostname, shard)
return IncrementalPush
}
pushType := IncrementalPush
// Find endpoint shard for this service, if it is available - otherwise create a new one.
ep, created := s.EndpointIndex.GetOrCreateEndpointShard(hostname, namespace)
// If we create a new endpoint shard, that means we have not seen the service earlier. We should do a full push.
if created {
log.Infof("Full push, new service %s/%s", namespace, hostname)
pushType = FullPush
}
ep.Lock()
defer ep.Unlock()
newIstioEndpoints := istioEndpoints
if features.SendUnhealthyEndpoints {
oldIstioEndpoints := ep.Shards[shard]
newIstioEndpoints = make([]*model.IstioEndpoint, 0, len(istioEndpoints))
// Check if new Endpoints are ready to be pushed. This check
// will ensure that if a new pod comes with a non ready endpoint,
// we do not unnecessarily push that config to Envoy.
// Please note that address is not a unique key. So this may not accurately
// identify based on health status and push too many times - which is ok since its an optimization.
emap := make(map[string]*model.IstioEndpoint, len(oldIstioEndpoints))
nmap := make(map[string]*model.IstioEndpoint, len(newIstioEndpoints))
// Add new endpoints only if they are ever ready once to shards
// so that full push does not send them from shards.
for _, oie := range oldIstioEndpoints {
emap[oie.Address] = oie
}
for _, nie := range istioEndpoints {
nmap[nie.Address] = nie
}
needPush := false
for _, nie := range istioEndpoints {
if oie, exists := emap[nie.Address]; exists {
// If endpoint exists already, we should push if it's health status changes.
if oie.HealthStatus != nie.HealthStatus {
needPush = true
}
newIstioEndpoints = append(newIstioEndpoints, nie)
} else if nie.HealthStatus == model.Healthy {
// If the endpoint does not exist in shards that means it is a
// new endpoint. Only send if it is healthy to avoid pushing endpoints
// that are not ready to start with.
needPush = true
newIstioEndpoints = append(newIstioEndpoints, nie)
}
}
// Next, check for endpoints that were in old but no longer exist. If there are any, there is a
// removal so we need to push an update.
for _, oie := range oldIstioEndpoints {
if _, f := nmap[oie.Address]; !f {
needPush = true
}
}
if pushType != FullPush && !needPush {
log.Debugf("No push, either old endpoint health status did not change or new endpoint came with unhealthy status, %v", hostname)
pushType = NoPush
}
}
ep.Shards[shard] = newIstioEndpoints
// Check if ServiceAccounts have changed. We should do a full push if they have changed.
saUpdated := s.UpdateServiceAccount(ep, hostname)
// For existing endpoints, we need to do full push if service accounts change.
if saUpdated && pushType != FullPush {
// Avoid extra logging if already a full push
log.Infof("Full push, service accounts changed, %v", hostname)
pushType = FullPush
}
// Clear the cache here. While it would likely be cleared later when we trigger a push, a race
// condition is introduced where an XDS response may be generated before the update, but not
// completed until after a response after the update. Essentially, we transition from v0 -> v1 ->
// v0 -> invalidate -> v1. Reverting a change we pushed violates our contract of monotonically
// moving forward in version. In practice, this is pretty rare and self corrects nearly
// immediately. However, clearing the cache here has almost no impact on cache performance as we
// would clear it shortly after anyways.
s.Cache.Clear(map[model.ConfigKey]struct{}{{
Kind: gvk.ServiceEntry,
Name: hostname,
Namespace: namespace,
}: {}})
return pushType
}
func (s *DiscoveryServer) RemoveShard(shardKey model.ShardKey) {
s.EndpointIndex.DeleteShard(shardKey)
}
// UpdateServiceAccount updates the service endpoints' sa when service/endpoint event happens.
// Note: it is not concurrent safe.
func (s *DiscoveryServer) UpdateServiceAccount(shards *model.EndpointShards, serviceName string) bool {
oldServiceAccount := shards.ServiceAccounts
serviceAccounts := sets.Set{}
for _, epShards := range shards.Shards {
for _, ep := range epShards {
if ep.ServiceAccount != "" {
serviceAccounts.Insert(ep.ServiceAccount)
}
}
}
if !oldServiceAccount.Equals(serviceAccounts) {
shards.ServiceAccounts = serviceAccounts
log.Debugf("Updating service accounts now, svc %v, before service account %v, after %v",
serviceName, oldServiceAccount, serviceAccounts)
return true
}
return false
}
// llbEndpointAndOptionsForCluster return the endpoints for a cluster
// Initial implementation is computing the endpoints on the flight - caching will be added as needed, based on
// perf tests.
func (s *DiscoveryServer) llbEndpointAndOptionsForCluster(b EndpointBuilder) ([]*LocLbEndpointsAndOptions, error) {
if b.service == nil {
// Shouldn't happen here
log.Debugf("can not find the service for cluster %s", b.clusterName)
return nil, nil
}
// Service resolution type might have changed and Cluster may be still in the EDS cluster list of "Connection.Clusters".
// This can happen if a ServiceEntry's resolution is changed from STATIC to DNS which changes the Envoy cluster type from
// EDS to STRICT_DNS or LOGICAL_DNS. When pushEds is called before Envoy sends the updated cluster list via Endpoint request which in turn
// will update "Connection.Clusters", we might accidentally send EDS updates for STRICT_DNS cluster. This check guards
// against such behavior and returns nil. When the updated cluster warms up in Envoy, it would update with new endpoints
// automatically.
// Gateways use EDS for Passthrough cluster. So we should allow Passthrough here.
if b.service.Resolution == model.DNSLB || b.service.Resolution == model.DNSRoundRobinLB {
log.Infof("cluster %s in eds cluster, but its resolution now is updated to %v, skipping it.", b.clusterName, b.service.Resolution)
return nil, fmt.Errorf("cluster %s in eds cluster", b.clusterName)
}
svcPort, f := b.service.Ports.GetByPort(b.port)
if !f {
// Shouldn't happen here
log.Debugf("can not find the service port %d for cluster %s", b.port, b.clusterName)
return nil, nil
}
epShards, f := s.EndpointIndex.ShardsForService(string(b.hostname), b.service.Attributes.Namespace)
if !f {
// Shouldn't happen here
log.Debugf("can not find the endpointShards for cluster %s", b.clusterName)
return nil, nil
}
return b.buildLocalityLbEndpointsFromShards(epShards, svcPort), nil
}
func (s *DiscoveryServer) generateEndpoints(b EndpointBuilder) *endpoint.ClusterLoadAssignment {
llbOpts, err := s.llbEndpointAndOptionsForCluster(b)
if err != nil {
return buildEmptyClusterLoadAssignment(b.clusterName)
}
// Apply the Split Horizon EDS filter, if applicable.
llbOpts = b.EndpointsByNetworkFilter(llbOpts)
if model.IsDNSSrvSubsetKey(b.clusterName) {
// For the SNI-DNAT clusters, we are using AUTO_PASSTHROUGH gateway. AUTO_PASSTHROUGH is intended
// to passthrough mTLS requests. However, at the gateway we do not actually have any way to tell if the
// request is a valid mTLS request or not, since its passthrough TLS.
// To ensure we allow traffic only to mTLS endpoints, we filter out non-mTLS endpoints for these cluster types.
llbOpts = b.EndpointsWithMTLSFilter(llbOpts)
}
llbOpts = b.ApplyTunnelSetting(llbOpts, b.tunnelType)
l := b.createClusterLoadAssignment(llbOpts)
// If locality aware routing is enabled, prioritize endpoints or set their lb weight.
// Failover should only be enabled when there is an outlier detection, otherwise Envoy
// will never detect the hosts are unhealthy and redirect traffic.
enableFailover, lb := getOutlierDetectionAndLoadBalancerSettings(b.DestinationRule(), b.port, b.subsetName)
lbSetting := loadbalancer.GetLocalityLbSetting(b.push.Mesh.GetLocalityLbSetting(), lb.GetLocalityLbSetting())
if lbSetting != nil {
// Make a shallow copy of the cla as we are mutating the endpoints with priorities/weights relative to the calling proxy
l = util.CloneClusterLoadAssignment(l)
wrappedLocalityLbEndpoints := make([]*loadbalancer.WrappedLocalityLbEndpoints, len(llbOpts))
for i := range llbOpts {
wrappedLocalityLbEndpoints[i] = &loadbalancer.WrappedLocalityLbEndpoints{
IstioEndpoints: llbOpts[i].istioEndpoints,
LocalityLbEndpoints: l.Endpoints[i],
}
}
loadbalancer.ApplyLocalityLBSetting(l, wrappedLocalityLbEndpoints, b.locality, b.proxy.Metadata.Labels, lbSetting, enableFailover)
}
return l
}
// EdsGenerator implements the new Generate method for EDS, using the in-memory, optimized endpoint
// storage in DiscoveryServer.
type EdsGenerator struct {
Server *DiscoveryServer
}
var _ model.XdsDeltaResourceGenerator = &EdsGenerator{}
// Map of all configs that do not impact EDS
var skippedEdsConfigs = map[config.GroupVersionKind]struct{}{
gvk.Gateway: {},
gvk.VirtualService: {},
gvk.WorkloadGroup: {},
gvk.AuthorizationPolicy: {},
gvk.RequestAuthentication: {},
gvk.Secret: {},
gvk.Telemetry: {},
gvk.WasmPlugin: {},
gvk.ProxyConfig: {},
}
func edsNeedsPush(updates model.XdsUpdates) bool {
// If none set, we will always push
if len(updates) == 0 {
return true
}
for config := range updates {
if _, f := skippedEdsConfigs[config.Kind]; !f {
return true
}
}
return false
}
func (eds *EdsGenerator) Generate(proxy *model.Proxy, w *model.WatchedResource, req *model.PushRequest) (model.Resources, model.XdsLogDetails, error) {
if !edsNeedsPush(req.ConfigsUpdated) {
return nil, model.DefaultXdsLogDetails, nil
}
resources, logDetails := eds.buildEndpoints(proxy, req, w)
return resources, logDetails, nil
}
func getOutlierDetectionAndLoadBalancerSettings(
destinationRule *networkingapi.DestinationRule,
portNumber int,
subsetName string) (bool, *networkingapi.LoadBalancerSettings) {
if destinationRule == nil {
return false, nil
}
outlierDetectionEnabled := false
var lbSettings *networkingapi.LoadBalancerSettings
port := &model.Port{Port: portNumber}
policy := networking.MergeTrafficPolicy(nil, destinationRule.TrafficPolicy, port)
for _, subset := range destinationRule.Subsets {
if subset.Name == subsetName {
policy = networking.MergeTrafficPolicy(policy, subset.TrafficPolicy, port)
break
}
}
if policy != nil {
lbSettings = policy.LoadBalancer
if policy.OutlierDetection != nil {
outlierDetectionEnabled = true
}
}
return outlierDetectionEnabled, lbSettings
}
func endpointDiscoveryResponse(loadAssignments []*any.Any, version, noncePrefix string) *discovery.DiscoveryResponse {
out := &discovery.DiscoveryResponse{
TypeUrl: v3.EndpointType,
// Pilot does not really care for versioning. It always supplies what's currently
// available to it, irrespective of whether Envoy chooses to accept or reject EDS
// responses. Pilot believes in eventual consistency and that at some point, Envoy
// will begin seeing results it deems to be good.
VersionInfo: version,
Nonce: nonce(noncePrefix),
Resources: loadAssignments,
}
return out
}
// cluster with no endpoints
func buildEmptyClusterLoadAssignment(clusterName string) *endpoint.ClusterLoadAssignment {
return &endpoint.ClusterLoadAssignment{
ClusterName: clusterName,
}
}
func (eds *EdsGenerator) GenerateDeltas(proxy *model.Proxy, req *model.PushRequest,
w *model.WatchedResource) (model.Resources, model.DeletedResources, model.XdsLogDetails, bool, error) {
if !edsNeedsPush(req.ConfigsUpdated) {
return nil, nil, model.DefaultXdsLogDetails, false, nil
}
if !shouldUseDeltaEds(req) {
resources, logDetails := eds.buildEndpoints(proxy, req, w)
return resources, nil, logDetails, false, nil
}
resources, removed, logs := eds.buildDeltaEndpoints(proxy, req, w)
return resources, removed, logs, true, nil
}
func shouldUseDeltaEds(req *model.PushRequest) bool {
if !req.Full {
return false
}
return onlyEndpointsChanged(req)
}
// onlyEndpointsChanged checks if a request contains *only* endpoints updates. This allows us to perform more efficient pushes
// where we only update the endpoints that did change.
func onlyEndpointsChanged(req *model.PushRequest) bool {
if len(req.ConfigsUpdated) > 0 {
for k := range req.ConfigsUpdated {
if k.Kind != gvk.ServiceEntry {
return false
}
}
return true
}
return false
}
func (eds *EdsGenerator) buildEndpoints(proxy *model.Proxy,
req *model.PushRequest,
w *model.WatchedResource) (model.Resources, model.XdsLogDetails) {
var edsUpdatedServices map[string]struct{}
// canSendPartialFullPushes determines if we can send a partial push (ie a subset of known CLAs).
// This is safe when only Services has changed, as this implies that only the CLAs for the
// associated Service changed. Note when a multi-network Service changes it triggers a push with
// ConfigsUpdated=ALL, so in this case we would not enable a partial push.
// Despite this code existing on the SotW code path, sending these partial pushes is still allowed;
// see https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#grouping-resources-into-responses
if !req.Full || (features.PartialFullPushes && onlyEndpointsChanged(req)) {
edsUpdatedServices = model.ConfigNamesOfKind(req.ConfigsUpdated, gvk.ServiceEntry)
}
var resources model.Resources
empty := 0
cached := 0
regenerated := 0
for _, clusterName := range w.ResourceNames {
if edsUpdatedServices != nil {
_, _, hostname, _ := model.ParseSubsetKey(clusterName)
if _, ok := edsUpdatedServices[string(hostname)]; !ok {
// Cluster was not updated, skip recomputing. This happens when we get an incremental update for a
// specific Hostname. On connect or for full push edsUpdatedServices will be empty.
continue
}
}
builder := NewEndpointBuilder(clusterName, proxy, req.Push)
if marshalledEndpoint, f := eds.Server.Cache.Get(builder); f && !features.EnableUnsafeAssertions {
// We skip cache if assertions are enabled, so that the cache will assert our eviction logic is correct
resources = append(resources, marshalledEndpoint)
cached++
} else {
l := eds.Server.generateEndpoints(builder)
if l == nil {
continue
}
regenerated++
if len(l.Endpoints) == 0 {
empty++
}
resource := &discovery.Resource{
Name: l.ClusterName,
Resource: util.MessageToAny(l),
}
resources = append(resources, resource)
eds.Server.Cache.Add(builder, req, resource)
}
}
return resources, model.XdsLogDetails{
Incremental: len(edsUpdatedServices) != 0,
AdditionalInfo: fmt.Sprintf("empty:%v cached:%v/%v", empty, cached, cached+regenerated),
}
}
// TODO(@hzxuzhonghu): merge with buildEndpoints
func (eds *EdsGenerator) buildDeltaEndpoints(proxy *model.Proxy,
req *model.PushRequest,
w *model.WatchedResource) (model.Resources, []string, model.XdsLogDetails) {
edsUpdatedServices := model.ConfigNamesOfKind(req.ConfigsUpdated, gvk.ServiceEntry)
var resources model.Resources
var removed []string
empty := 0
cached := 0
regenerated := 0
for _, clusterName := range w.ResourceNames {
// filter out eds that are not updated for clusters
_, _, hostname, _ := model.ParseSubsetKey(clusterName)
if _, ok := edsUpdatedServices[string(hostname)]; !ok {
continue
}
builder := NewEndpointBuilder(clusterName, proxy, req.Push)
// if a service is not found, it means the cluster is removed
if builder.service == nil {
removed = append(removed, clusterName)
continue
}
if marshalledEndpoint, f := eds.Server.Cache.Get(builder); f && !features.EnableUnsafeAssertions {
// We skip cache if assertions are enabled, so that the cache will assert our eviction logic is correct
resources = append(resources, marshalledEndpoint)
cached++
} else {
l := eds.Server.generateEndpoints(builder)
if l == nil {
removed = append(removed, clusterName)
continue
}
regenerated++
if len(l.Endpoints) == 0 {
empty++
}
resource := &discovery.Resource{
Name: l.ClusterName,
Resource: util.MessageToAny(l),
}
resources = append(resources, resource)
eds.Server.Cache.Add(builder, req, resource)
}
}
return resources, removed, model.XdsLogDetails{
Incremental: len(edsUpdatedServices) != 0,
AdditionalInfo: fmt.Sprintf("empty:%v cached:%v/%v", empty, cached, cached+regenerated),
}
}