| // Copyright Istio Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package xds |
| |
| import ( |
| "strconv" |
| "strings" |
| "sync/atomic" |
| "time" |
| ) |
| |
| import ( |
| core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" |
| discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
| uatomic "go.uber.org/atomic" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/peer" |
| "google.golang.org/grpc/status" |
| "istio.io/pkg/env" |
| istiolog "istio.io/pkg/log" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/controller/workloadentry" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/features" |
| istiogrpc "github.com/apache/dubbo-go-pixiu/pilot/pkg/grpc" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/model" |
| "github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/util" |
| labelutil "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/util/label" |
| v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3" |
| "github.com/apache/dubbo-go-pixiu/pkg/cluster" |
| "github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk" |
| "github.com/apache/dubbo-go-pixiu/pkg/util/sets" |
| ) |
| |
| var ( |
| log = istiolog.RegisterScope("ads", "ads debugging", 0) |
| |
| // Tracks connections, increment on each new connection. |
| connectionNumber = int64(0) |
| ) |
| |
| // Used only when running in KNative, to handle the load balancing behavior. |
| var firstRequest = uatomic.NewBool(true) |
| |
| var knativeEnv = env.RegisterStringVar("K_REVISION", "", |
| "KNative revision, set if running in knative").Get() |
| |
| // DiscoveryStream is a server interface for XDS. |
| type DiscoveryStream = discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer |
| |
| // DeltaDiscoveryStream is a server interface for Delta XDS. |
| type DeltaDiscoveryStream = discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer |
| |
| // DiscoveryClient is a client interface for XDS. |
| type DiscoveryClient = discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient |
| |
| // DeltaDiscoveryClient is a client interface for Delta XDS. |
| type DeltaDiscoveryClient = discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesClient |
| |
| // Connection holds information about connected client. |
| type Connection struct { |
| // peerAddr is the address of the client, from network layer. |
| peerAddr string |
| |
| // Time of connection, for debugging |
| connectedAt time.Time |
| |
| // conID is the connection conID, used as a key in the connection table. |
| // Currently based on the node name and a counter. |
| conID string |
| |
| // proxy is the client to which this connection is established. |
| proxy *model.Proxy |
| |
| // Sending on this channel results in a push. |
| pushChannel chan *Event |
| |
| // Both ADS and SDS streams implement this interface |
| stream DiscoveryStream |
| // deltaStream is used for Delta XDS. Only one of deltaStream or stream will be set |
| deltaStream DeltaDiscoveryStream |
| |
| // Original node metadata, to avoid unmarshal/marshal. |
| // This is included in internal events. |
| node *core.Node |
| |
| // initialized channel will be closed when proxy is initialized. Pushes, or anything accessing |
| // the proxy, should not be started until this channel is closed. |
| initialized chan struct{} |
| |
| // stop can be used to end the connection manually via debug endpoints. Only to be used for testing. |
| stop chan struct{} |
| |
| // reqChan is used to receive discovery requests for this connection. |
| reqChan chan *discovery.DiscoveryRequest |
| deltaReqChan chan *discovery.DeltaDiscoveryRequest |
| |
| // errorChan is used to process error during discovery request processing. |
| errorChan chan error |
| } |
| |
| // Event represents a config or registry event that results in a push. |
| type Event struct { |
| // pushRequest PushRequest to use for the push. |
| pushRequest *model.PushRequest |
| |
| // function to call once a push is finished. This must be called or future changes may be blocked. |
| done func() |
| } |
| |
| func newConnection(peerAddr string, stream DiscoveryStream) *Connection { |
| return &Connection{ |
| pushChannel: make(chan *Event), |
| initialized: make(chan struct{}), |
| stop: make(chan struct{}), |
| reqChan: make(chan *discovery.DiscoveryRequest, 1), |
| errorChan: make(chan error, 1), |
| peerAddr: peerAddr, |
| connectedAt: time.Now(), |
| stream: stream, |
| } |
| } |
| |
| func (s *DiscoveryServer) receive(con *Connection, identities []string) { |
| defer func() { |
| close(con.errorChan) |
| close(con.reqChan) |
| // Close the initialized channel, if its not already closed, to prevent blocking the stream. |
| select { |
| case <-con.initialized: |
| default: |
| close(con.initialized) |
| } |
| }() |
| |
| firstRequest := true |
| for { |
| req, err := con.stream.Recv() |
| if err != nil { |
| if istiogrpc.IsExpectedGRPCError(err) { |
| log.Infof("ADS: %q %s terminated", con.peerAddr, con.conID) |
| return |
| } |
| con.errorChan <- err |
| log.Errorf("ADS: %q %s terminated with error: %v", con.peerAddr, con.conID, err) |
| totalXDSInternalErrors.Increment() |
| return |
| } |
| // This should be only set for the first request. The node id may not be set - for example malicious clients. |
| if firstRequest { |
| // probe happens before envoy sends first xDS request |
| if req.TypeUrl == v3.HealthInfoType { |
| log.Warnf("ADS: %q %s send health check probe before normal xDS request", con.peerAddr, con.conID) |
| continue |
| } |
| firstRequest = false |
| if req.Node == nil || req.Node.Id == "" { |
| con.errorChan <- status.New(codes.InvalidArgument, "missing node information").Err() |
| return |
| } |
| if err := s.initConnection(req.Node, con, identities); err != nil { |
| con.errorChan <- err |
| return |
| } |
| defer s.closeConnection(con) |
| log.Infof("ADS: new connection for node:%s", con.conID) |
| } |
| |
| select { |
| case con.reqChan <- req: |
| case <-con.stream.Context().Done(): |
| log.Infof("ADS: %q %s terminated with stream closed", con.peerAddr, con.conID) |
| return |
| } |
| } |
| } |
| |
| // processRequest handles one discovery request. This is currently called from the 'main' thread, which also |
| // handles 'push' requests and close - the code will eventually call the 'push' code, and it needs more mutex |
| // protection. Original code avoided the mutexes by doing both 'push' and 'process requests' in same thread. |
| func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *Connection) error { |
| if req.TypeUrl == v3.HealthInfoType { |
| s.handleWorkloadHealthcheck(con.proxy, req) |
| return nil |
| } |
| |
| // For now, don't let xDS piggyback debug requests start watchers. |
| if strings.HasPrefix(req.TypeUrl, v3.DebugType) { |
| return s.pushXds(con, |
| &model.WatchedResource{TypeUrl: req.TypeUrl, ResourceNames: req.ResourceNames}, |
| &model.PushRequest{Full: true, Push: con.proxy.LastPushContext}) |
| } |
| if s.StatusReporter != nil { |
| s.StatusReporter.RegisterEvent(con.conID, req.TypeUrl, req.ResponseNonce) |
| } |
| shouldRespond, delta := s.shouldRespond(con, req) |
| if !shouldRespond { |
| return nil |
| } |
| |
| request := &model.PushRequest{ |
| Full: true, |
| Push: con.proxy.LastPushContext, |
| Reason: []model.TriggerReason{model.ProxyRequest}, |
| |
| // The usage of LastPushTime (rather than time.Now()), is critical here for correctness; This time |
| // is used by the XDS cache to determine if a entry is stale. If we use Now() with an old push context, |
| // we may end up overriding active cache entries with stale ones. |
| Start: con.proxy.LastPushTime, |
| Delta: delta, |
| } |
| |
| // SidecarScope for the proxy may not have been updated based on this pushContext. |
| // It can happen when `processRequest` comes after push context has been updated(s.initPushContext), |
| // but proxy's SidecarScope has been updated(s.updateProxy) due to optimizations that skip sidecar scope |
| // computation. |
| if con.proxy.SidecarScope != nil && con.proxy.SidecarScope.Version != request.Push.PushVersion { |
| s.computeProxyState(con.proxy, request) |
| } |
| return s.pushXds(con, con.Watched(req.TypeUrl), request) |
| } |
| |
| // StreamAggregatedResources implements the ADS interface. |
| func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { |
| return s.Stream(stream) |
| } |
| |
| func (s *DiscoveryServer) Stream(stream DiscoveryStream) error { |
| if knativeEnv != "" && firstRequest.Load() { |
| // How scaling works in knative is the first request is the "loading" request. During |
| // loading request, concurrency=1. Once that request is done, concurrency is enabled. |
| // However, the XDS stream is long lived, so the first request would block all others. As a |
| // result, we should exit the first request immediately; clients will retry. |
| firstRequest.Store(false) |
| return status.Error(codes.Unavailable, "server warmup not complete; try again") |
| } |
| // Check if server is ready to accept clients and process new requests. |
| // Currently ready means caches have been synced and hence can build |
| // clusters correctly. Without this check, InitContext() call below would |
| // initialize with empty config, leading to reconnected Envoys loosing |
| // configuration. This is an additional safety check inaddition to adding |
| // cachesSynced logic to readiness probe to handle cases where kube-proxy |
| // ip tables update latencies. |
| // See https://github.com/istio/istio/issues/25495. |
| if !s.IsServerReady() { |
| return status.Error(codes.Unavailable, "server is not ready to serve discovery information") |
| } |
| |
| ctx := stream.Context() |
| peerAddr := "0.0.0.0" |
| if peerInfo, ok := peer.FromContext(ctx); ok { |
| peerAddr = peerInfo.Addr.String() |
| } |
| |
| if err := s.WaitForRequestLimit(stream.Context()); err != nil { |
| log.Warnf("ADS: %q exceeded rate limit: %v", peerAddr, err) |
| return status.Errorf(codes.ResourceExhausted, "request rate limit exceeded: %v", err) |
| } |
| |
| ids, err := s.authenticate(ctx) |
| if err != nil { |
| return status.Error(codes.Unauthenticated, err.Error()) |
| } |
| if ids != nil { |
| log.Debugf("Authenticated XDS: %v with identity %v", peerAddr, ids) |
| } else { |
| log.Debugf("Unauthenticated XDS: %s", peerAddr) |
| } |
| |
| // InitContext returns immediately if the context was already initialized. |
| if err = s.globalPushContext().InitContext(s.Env, nil, nil); err != nil { |
| // Error accessing the data - log and close, maybe a different pilot replica |
| // has more luck |
| log.Warnf("Error reading config %v", err) |
| return status.Error(codes.Unavailable, "error reading config") |
| } |
| con := newConnection(peerAddr, stream) |
| |
| // Do not call: defer close(con.pushChannel). The push channel will be garbage collected |
| // when the connection is no longer used. Closing the channel can cause subtle race conditions |
| // with push. According to the spec: "It's only necessary to close a channel when it is important |
| // to tell the receiving goroutines that all data have been sent." |
| |
| // Block until either a request is received or a push is triggered. |
| // We need 2 go routines because 'read' blocks in Recv(). |
| go s.receive(con, ids) |
| |
| // Wait for the proxy to be fully initialized before we start serving traffic. Because |
| // initialization doesn't have dependencies that will block, there is no need to add any timeout |
| // here. Prior to this explicit wait, we were implicitly waiting by receive() not sending to |
| // reqChannel and the connection not being enqueued for pushes to pushChannel until the |
| // initialization is complete. |
| <-con.initialized |
| |
| for { |
| select { |
| case req, ok := <-con.reqChan: |
| if ok { |
| if err := s.processRequest(req, con); err != nil { |
| return err |
| } |
| } else { |
| // Remote side closed connection or error processing the request. |
| return <-con.errorChan |
| } |
| case pushEv := <-con.pushChannel: |
| err := s.pushConnection(con, pushEv) |
| pushEv.done() |
| if err != nil { |
| return err |
| } |
| case <-con.stop: |
| return nil |
| } |
| } |
| } |
| |
| var emptyResourceDelta = model.ResourceDelta{} |
| |
| // shouldRespond determines whether this request needs to be responded back. It applies the ack/nack rules as per xds protocol |
| // using WatchedResource for previous state and discovery request for the current state. |
| func (s *DiscoveryServer) shouldRespond(con *Connection, request *discovery.DiscoveryRequest) (bool, model.ResourceDelta) { |
| stype := v3.GetShortType(request.TypeUrl) |
| |
| // If there is an error in request that means previous response is erroneous. |
| // We do not have to respond in that case. In this case request's version info |
| // will be different from the version sent. But it is fragile to rely on that. |
| if request.ErrorDetail != nil { |
| errCode := codes.Code(request.ErrorDetail.Code) |
| log.Warnf("ADS:%s: ACK ERROR %s %s:%s", stype, con.conID, errCode.String(), request.ErrorDetail.GetMessage()) |
| incrementXDSRejects(request.TypeUrl, con.proxy.ID, errCode.String()) |
| if s.StatusGen != nil { |
| s.StatusGen.OnNack(con.proxy, request) |
| } |
| con.proxy.Lock() |
| if w, f := con.proxy.WatchedResources[request.TypeUrl]; f { |
| w.NonceNacked = request.ResponseNonce |
| } |
| con.proxy.Unlock() |
| return false, emptyResourceDelta |
| } |
| |
| if shouldUnsubscribe(request) { |
| log.Debugf("ADS:%s: UNSUBSCRIBE %s %s %s", stype, con.conID, request.VersionInfo, request.ResponseNonce) |
| con.proxy.Lock() |
| delete(con.proxy.WatchedResources, request.TypeUrl) |
| con.proxy.Unlock() |
| return false, emptyResourceDelta |
| } |
| |
| con.proxy.RLock() |
| previousInfo := con.proxy.WatchedResources[request.TypeUrl] |
| con.proxy.RUnlock() |
| |
| // This can happen in two cases: |
| // 1. Envoy initially send request to Istiod |
| // 2. Envoy reconnect to Istiod i.e. Istiod does not have |
| // information about this typeUrl, but Envoy sends response nonce - either |
| // because Istiod is restarted or Envoy disconnects and reconnects. |
| // We should always respond with the current resource names. |
| if request.ResponseNonce == "" || previousInfo == nil { |
| log.Debugf("ADS:%s: INIT/RECONNECT %s %s %s", stype, con.conID, request.VersionInfo, request.ResponseNonce) |
| con.proxy.Lock() |
| con.proxy.WatchedResources[request.TypeUrl] = &model.WatchedResource{TypeUrl: request.TypeUrl, ResourceNames: request.ResourceNames} |
| con.proxy.Unlock() |
| return true, emptyResourceDelta |
| } |
| |
| // If there is mismatch in the nonce, that is a case of expired/stale nonce. |
| // A nonce becomes stale following a newer nonce being sent to Envoy. |
| if request.ResponseNonce != previousInfo.NonceSent { |
| log.Debugf("ADS:%s: REQ %s Expired nonce received %s, sent %s", stype, |
| con.conID, request.ResponseNonce, previousInfo.NonceSent) |
| xdsExpiredNonce.With(typeTag.Value(v3.GetMetricType(request.TypeUrl))).Increment() |
| con.proxy.Lock() |
| con.proxy.WatchedResources[request.TypeUrl].NonceNacked = "" |
| con.proxy.Unlock() |
| return false, emptyResourceDelta |
| } |
| |
| // If it comes here, that means nonce match. This an ACK. We should record |
| // the ack details and respond if there is a change in resource names. |
| con.proxy.Lock() |
| previousResources := con.proxy.WatchedResources[request.TypeUrl].ResourceNames |
| con.proxy.WatchedResources[request.TypeUrl].NonceAcked = request.ResponseNonce |
| con.proxy.WatchedResources[request.TypeUrl].NonceNacked = "" |
| con.proxy.WatchedResources[request.TypeUrl].ResourceNames = request.ResourceNames |
| con.proxy.Unlock() |
| |
| prev := sets.New(previousResources...) |
| cur := sets.New(request.ResourceNames...) |
| removed := prev.Difference(cur) |
| added := cur.Difference(prev) |
| // Envoy can send two DiscoveryRequests with same version and nonce |
| // when it detects a new resource. We should respond if they change. |
| if len(removed) == 0 && len(added) == 0 { |
| log.Debugf("ADS:%s: ACK %s %s %s", stype, con.conID, request.VersionInfo, request.ResponseNonce) |
| return false, emptyResourceDelta |
| } |
| log.Debugf("ADS:%s: RESOURCE CHANGE added %v removed %v %s %s %s", stype, |
| added, removed, con.conID, request.VersionInfo, request.ResponseNonce) |
| |
| return true, model.ResourceDelta{ |
| Subscribed: added, |
| Unsubscribed: removed, |
| } |
| } |
| |
| // shouldUnsubscribe checks if we should unsubscribe. This is done when Envoy is |
| // no longer watching. For example, we remove all RDS references, we will |
| // unsubscribe from RDS. NOTE: This may happen as part of the initial request. If |
| // there are no routes needed, Envoy will send an empty request, which this |
| // properly handles by not adding it to the watched resource list. |
| func shouldUnsubscribe(request *discovery.DiscoveryRequest) bool { |
| return len(request.ResourceNames) == 0 && !isWildcardTypeURL(request.TypeUrl) |
| } |
| |
| // isWildcardTypeURL checks whether a given type is a wildcard type |
| // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return |
| // If the list of resource names becomes empty, that means that the client is no |
| // longer interested in any resources of the specified type. For Listener and |
| // Cluster resource types, there is also a “wildcard” mode, which is triggered |
| // when the initial request on the stream for that resource type contains no |
| // resource names. |
| func isWildcardTypeURL(typeURL string) bool { |
| switch typeURL { |
| case v3.SecretType, v3.EndpointType, v3.RouteType, v3.ExtensionConfigurationType, v3.DubboServiceNameMappingType: |
| // By XDS spec, these are not wildcard |
| return false |
| case v3.ClusterType, v3.ListenerType: |
| // By XDS spec, these are wildcard |
| return true |
| default: |
| // All of our internal types use wildcard semantics |
| return true |
| } |
| } |
| |
| // listEqualUnordered checks that two lists contain all the same elements |
| func listEqualUnordered(a []string, b []string) bool { |
| if len(a) != len(b) { |
| return false |
| } |
| first := make(map[string]struct{}, len(a)) |
| for _, c := range a { |
| first[c] = struct{}{} |
| } |
| for _, c := range b { |
| _, f := first[c] |
| if !f { |
| return false |
| } |
| } |
| return true |
| } |
| |
| // update the node associated with the connection, after receiving a packet from envoy, also adds the connection |
| // to the tracking map. |
| func (s *DiscoveryServer) initConnection(node *core.Node, con *Connection, identities []string) error { |
| // Setup the initial proxy metadata |
| proxy, err := s.initProxyMetadata(node) |
| if err != nil { |
| return err |
| } |
| // Check if proxy cluster has an alias configured, if yes use that as cluster ID for this proxy. |
| if alias, exists := s.ClusterAliases[proxy.Metadata.ClusterID]; exists { |
| proxy.Metadata.ClusterID = alias |
| } |
| // To ensure push context is monotonically increasing, setup LastPushContext before we addCon. This |
| // way only new push contexts will be registered for this proxy. |
| proxy.LastPushContext = s.globalPushContext() |
| // First request so initialize connection id and start tracking it. |
| con.conID = connectionID(proxy.ID) |
| con.node = node |
| con.proxy = proxy |
| |
| // Authorize xds clients |
| if err := s.authorize(con, identities); err != nil { |
| return err |
| } |
| |
| // Register the connection. this allows pushes to be triggered for the proxy. Note: the timing of |
| // this and initializeProxy important. While registering for pushes *after* initialization is complete seems like |
| // a better choice, it introduces a race condition; If we complete initialization of a new push |
| // context between initializeProxy and addCon, we would not get any pushes triggered for the new |
| // push context, leading the proxy to have a stale state until the next full push. |
| s.addCon(con.conID, con) |
| // Register that initialization is complete. This triggers to calls that it is safe to access the |
| // proxy |
| defer close(con.initialized) |
| |
| // Complete full initialization of the proxy |
| if err := s.initializeProxy(node, con); err != nil { |
| s.closeConnection(con) |
| return err |
| } |
| |
| if s.StatusGen != nil { |
| s.StatusGen.OnConnect(con) |
| } |
| return nil |
| } |
| |
| func (s *DiscoveryServer) closeConnection(con *Connection) { |
| if con.conID == "" { |
| return |
| } |
| s.removeCon(con.conID) |
| if s.StatusGen != nil { |
| s.StatusGen.OnDisconnect(con) |
| } |
| if s.StatusReporter != nil { |
| s.StatusReporter.RegisterDisconnect(con.conID, AllEventTypesList) |
| } |
| s.WorkloadEntryController.QueueUnregisterWorkload(con.proxy, con.connectedAt) |
| } |
| |
| func connectionID(node string) string { |
| id := atomic.AddInt64(&connectionNumber, 1) |
| return node + "-" + strconv.FormatInt(id, 10) |
| } |
| |
| // initProxyMetadata initializes just the basic metadata of a proxy. This is decoupled from |
| // initProxyState such that we can perform authorization before attempting expensive computations to |
| // fully initialize the proxy. |
| func (s *DiscoveryServer) initProxyMetadata(node *core.Node) (*model.Proxy, error) { |
| meta, err := model.ParseMetadata(node.Metadata) |
| if err != nil { |
| return nil, err |
| } |
| proxy, err := model.ParseServiceNodeWithMetadata(node.Id, meta) |
| if err != nil { |
| return nil, err |
| } |
| // Update the config namespace associated with this proxy |
| proxy.ConfigNamespace = model.GetProxyConfigNamespace(proxy) |
| proxy.XdsNode = node |
| return proxy, nil |
| } |
| |
| // initializeProxy completes the initialization of a proxy. It is expected to be called only after |
| // initProxyMetadata. |
| func (s *DiscoveryServer) initializeProxy(node *core.Node, con *Connection) error { |
| proxy := con.proxy |
| // this should be done before we look for service instances, but after we load metadata |
| // TODO fix check in kubecontroller treat echo VMs like there isn't a pod |
| if err := s.WorkloadEntryController.RegisterWorkload(proxy, con.connectedAt); err != nil { |
| return err |
| } |
| s.computeProxyState(proxy, nil) |
| |
| // Get the locality from the proxy's service instances. |
| // We expect all instances to have the same IP and therefore the same locality. |
| // So its enough to look at the first instance. |
| if len(proxy.ServiceInstances) > 0 { |
| proxy.Locality = util.ConvertLocality(proxy.ServiceInstances[0].Endpoint.Locality.Label) |
| } |
| |
| // If there is no locality in the registry then use the one sent as part of the discovery request. |
| // This is not preferable as only the connected Pilot is aware of this proxies location, but it |
| // can still help provide some client-side Envoy context when load balancing based on location. |
| if util.IsLocalityEmpty(proxy.Locality) { |
| proxy.Locality = &core.Locality{ |
| Region: node.Locality.GetRegion(), |
| Zone: node.Locality.GetZone(), |
| SubZone: node.Locality.GetSubZone(), |
| } |
| } |
| |
| locality := util.LocalityToString(proxy.Locality) |
| // add topology labels to proxy metadata labels |
| proxy.Metadata.Labels = labelutil.AugmentLabels(proxy.Metadata.Labels, proxy.Metadata.ClusterID, locality, proxy.Metadata.Network) |
| // Discover supported IP Versions of proxy so that appropriate config can be delivered. |
| proxy.DiscoverIPMode() |
| |
| proxy.WatchedResources = map[string]*model.WatchedResource{} |
| // Based on node metadata and version, we can associate a different generator. |
| if proxy.Metadata.Generator != "" { |
| proxy.XdsResourceGenerator = s.Generators[proxy.Metadata.Generator] |
| } |
| |
| return nil |
| } |
| |
| func (s *DiscoveryServer) updateProxy(proxy *model.Proxy, request *model.PushRequest) { |
| s.computeProxyState(proxy, request) |
| if util.IsLocalityEmpty(proxy.Locality) { |
| // Get the locality from the proxy's service instances. |
| // We expect all instances to have the same locality. |
| // So its enough to look at the first instance. |
| if len(proxy.ServiceInstances) > 0 { |
| proxy.Locality = util.ConvertLocality(proxy.ServiceInstances[0].Endpoint.Locality.Label) |
| locality := proxy.ServiceInstances[0].Endpoint.Locality.Label |
| // add topology labels to proxy metadata labels |
| proxy.Metadata.Labels = labelutil.AugmentLabels(proxy.Metadata.Labels, proxy.Metadata.ClusterID, locality, proxy.Metadata.Network) |
| } |
| } |
| } |
| |
| func (s *DiscoveryServer) computeProxyState(proxy *model.Proxy, request *model.PushRequest) { |
| proxy.SetWorkloadLabels(s.Env) |
| proxy.SetServiceInstances(s.Env.ServiceDiscovery) |
| // Precompute the sidecar scope and merged gateways associated with this proxy. |
| // Saves compute cycles in networking code. Though this might be redundant sometimes, we still |
| // have to compute this because as part of a config change, a new Sidecar could become |
| // applicable to this proxy |
| var sidecar, gateway bool |
| push := proxy.LastPushContext |
| if request == nil { |
| sidecar = true |
| gateway = true |
| } else { |
| push = request.Push |
| if len(request.ConfigsUpdated) == 0 { |
| sidecar = true |
| gateway = true |
| } |
| for conf := range request.ConfigsUpdated { |
| switch conf.Kind { |
| case gvk.ServiceEntry, gvk.DestinationRule, gvk.VirtualService, gvk.Sidecar, gvk.HTTPRoute, gvk.TCPRoute, gvk.ServiceNameMapping: |
| sidecar = true |
| case gvk.Gateway, gvk.KubernetesGateway, gvk.GatewayClass: |
| gateway = true |
| case gvk.Ingress: |
| sidecar = true |
| gateway = true |
| } |
| if sidecar && gateway { |
| break |
| } |
| } |
| } |
| // compute the sidecarscope for both proxy type whenever it changes. |
| if sidecar { |
| proxy.SetSidecarScope(push) |
| } |
| // only compute gateways for "router" type proxy. |
| if gateway && proxy.Type == model.Router { |
| proxy.SetGatewaysForProxy(push) |
| } |
| proxy.LastPushContext = push |
| if request != nil { |
| proxy.LastPushTime = request.Start |
| } |
| } |
| |
| // handleWorkloadHealthcheck processes HealthInformation type Url. |
| func (s *DiscoveryServer) handleWorkloadHealthcheck(proxy *model.Proxy, req *discovery.DiscoveryRequest) { |
| if features.WorkloadEntryHealthChecks { |
| event := workloadentry.HealthEvent{} |
| event.Healthy = req.ErrorDetail == nil |
| if !event.Healthy { |
| event.Message = req.ErrorDetail.Message |
| } |
| s.WorkloadEntryController.QueueWorkloadEntryHealth(proxy, event) |
| } |
| } |
| |
| // DeltaAggregatedResources is not implemented. |
| // Instead, Generators may send only updates/add, with Delete indicated by an empty spec. |
| // This works if both ends follow this model. For example EDS and the API generator follow this |
| // pattern. |
| // |
| // The delta protocol changes the request, adding unsubscribe/subscribe instead of sending full |
| // list of resources. On the response it adds 'removed resources' and sends changes for everything. |
| func (s *DiscoveryServer) DeltaAggregatedResources(stream discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { |
| return s.StreamDeltas(stream) |
| } |
| |
| // Compute and send the new configuration for a connection. |
| func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error { |
| pushRequest := pushEv.pushRequest |
| |
| if pushRequest.Full { |
| // Update Proxy with current information. |
| s.updateProxy(con.proxy, pushRequest) |
| } |
| |
| if !s.ProxyNeedsPush(con.proxy, pushRequest) { |
| log.Debugf("Skipping push to %v, no updates required", con.conID) |
| if pushRequest.Full { |
| // Only report for full versions, incremental pushes do not have a new version. |
| reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, nil) |
| } |
| return nil |
| } |
| |
| // Send pushes to all generators |
| // Each Generator is responsible for determining if the push event requires a push |
| wrl, ignoreEvents := con.pushDetails() |
| for _, w := range wrl { |
| if err := s.pushXds(con, w, pushRequest); err != nil { |
| return err |
| } |
| } |
| if pushRequest.Full { |
| // Report all events for unwatched resources. Watched resources will be reported in pushXds or on ack. |
| reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, ignoreEvents) |
| } |
| |
| proxiesConvergeDelay.Record(time.Since(pushRequest.Start).Seconds()) |
| return nil |
| } |
| |
| // PushOrder defines the order that updates will be pushed in. Any types not listed here will be pushed in random |
| // order after the types listed here |
| var PushOrder = []string{v3.ClusterType, v3.EndpointType, v3.ListenerType, v3.RouteType, v3.SecretType} |
| |
| // KnownOrderedTypeUrls has typeUrls for which we know the order of push. |
| var KnownOrderedTypeUrls = map[string]struct{}{ |
| v3.ClusterType: {}, |
| v3.EndpointType: {}, |
| v3.ListenerType: {}, |
| v3.RouteType: {}, |
| v3.SecretType: {}, |
| } |
| |
| func reportAllEvents(s DistributionStatusCache, id, version string, ignored sets.Set) { |
| if s == nil { |
| return |
| } |
| // this version of the config will never be distributed to this envoy because it is not a relevant diff. |
| // inform distribution status reporter that this connection has been updated, because it effectively has |
| for distributionType := range AllEventTypes { |
| if ignored.Contains(distributionType) { |
| // Skip this type |
| continue |
| } |
| s.RegisterEvent(id, distributionType, version) |
| } |
| } |
| |
| func (s *DiscoveryServer) adsClientCount() int { |
| s.adsClientsMutex.RLock() |
| defer s.adsClientsMutex.RUnlock() |
| return len(s.adsClients) |
| } |
| |
| func (s *DiscoveryServer) ProxyUpdate(clusterID cluster.ID, ip string) { |
| var connection *Connection |
| |
| for _, v := range s.Clients() { |
| if v.proxy.Metadata.ClusterID == clusterID && v.proxy.IPAddresses[0] == ip { |
| connection = v |
| break |
| } |
| } |
| |
| // It is possible that the envoy has not connected to this pilot, maybe connected to another pilot |
| if connection == nil { |
| return |
| } |
| if log.DebugEnabled() { |
| currentlyPending := s.pushQueue.Pending() |
| if currentlyPending != 0 { |
| log.Debugf("Starting new push while %v were still pending", currentlyPending) |
| } |
| } |
| |
| s.pushQueue.Enqueue(connection, &model.PushRequest{ |
| Full: true, |
| Push: s.globalPushContext(), |
| Start: time.Now(), |
| Reason: []model.TriggerReason{model.ProxyUpdate}, |
| }) |
| } |
| |
| // AdsPushAll will send updates to all nodes, for a full config or incremental EDS. |
| func AdsPushAll(s *DiscoveryServer) { |
| s.AdsPushAll(versionInfo(), &model.PushRequest{ |
| Full: true, |
| Push: s.globalPushContext(), |
| Reason: []model.TriggerReason{model.DebugTrigger}, |
| }) |
| } |
| |
| // AdsPushAll implements old style invalidation, generated when any rule or endpoint changes. |
| // Primary code path is from v1 discoveryService.clearCache(), which is added as a handler |
| // to the model ConfigStorageCache and Controller. |
| func (s *DiscoveryServer) AdsPushAll(version string, req *model.PushRequest) { |
| if !req.Full { |
| log.Infof("XDS: Incremental Pushing:%s ConnectedEndpoints:%d Version:%s", |
| version, s.adsClientCount(), req.Push.PushVersion) |
| } else { |
| totalService := len(req.Push.GetAllServices()) |
| log.Infof("XDS: Pushing:%s Services:%d ConnectedEndpoints:%d Version:%s", |
| version, totalService, s.adsClientCount(), req.Push.PushVersion) |
| monServices.Record(float64(totalService)) |
| |
| // Make sure the ConfigsUpdated map exists |
| if req.ConfigsUpdated == nil { |
| req.ConfigsUpdated = make(map[model.ConfigKey]struct{}) |
| } |
| } |
| |
| s.startPush(req) |
| } |
| |
| // Send a signal to all connections, with a push event. |
| func (s *DiscoveryServer) startPush(req *model.PushRequest) { |
| // Push config changes, iterating over connected envoys. |
| if log.DebugEnabled() { |
| currentlyPending := s.pushQueue.Pending() |
| if currentlyPending != 0 { |
| log.Debugf("Starting new push while %v were still pending", currentlyPending) |
| } |
| } |
| req.Start = time.Now() |
| for _, p := range s.AllClients() { |
| s.pushQueue.Enqueue(p, req) |
| } |
| } |
| |
| func (s *DiscoveryServer) addCon(conID string, con *Connection) { |
| s.adsClientsMutex.Lock() |
| defer s.adsClientsMutex.Unlock() |
| s.adsClients[conID] = con |
| recordXDSClients(con.proxy.Metadata.IstioVersion, 1) |
| } |
| |
| func (s *DiscoveryServer) removeCon(conID string) { |
| s.adsClientsMutex.Lock() |
| defer s.adsClientsMutex.Unlock() |
| |
| if con, exist := s.adsClients[conID]; !exist { |
| log.Errorf("ADS: Removing connection for non-existing node:%v.", conID) |
| totalXDSInternalErrors.Increment() |
| } else { |
| delete(s.adsClients, conID) |
| recordXDSClients(con.proxy.Metadata.IstioVersion, -1) |
| } |
| } |
| |
| // Send with timeout if configured. |
| func (conn *Connection) send(res *discovery.DiscoveryResponse) error { |
| sendHandler := func() error { |
| start := time.Now() |
| defer func() { recordSendTime(time.Since(start)) }() |
| return conn.stream.Send(res) |
| } |
| err := istiogrpc.Send(conn.stream.Context(), sendHandler) |
| if err == nil { |
| sz := 0 |
| for _, rc := range res.Resources { |
| sz += len(rc.Value) |
| } |
| if res.Nonce != "" && !strings.HasPrefix(res.TypeUrl, v3.DebugType) { |
| conn.proxy.Lock() |
| if conn.proxy.WatchedResources[res.TypeUrl] == nil { |
| conn.proxy.WatchedResources[res.TypeUrl] = &model.WatchedResource{TypeUrl: res.TypeUrl} |
| } |
| conn.proxy.WatchedResources[res.TypeUrl].NonceSent = res.Nonce |
| conn.proxy.WatchedResources[res.TypeUrl].VersionSent = res.VersionInfo |
| conn.proxy.WatchedResources[res.TypeUrl].LastSent = time.Now() |
| conn.proxy.Unlock() |
| } |
| } else if status.Convert(err).Code() == codes.DeadlineExceeded { |
| log.Infof("Timeout writing %s", conn.conID) |
| xdsResponseWriteTimeouts.Increment() |
| } |
| return err |
| } |
| |
| // nolint |
| // Synced checks if the type has been synced, meaning the most recent push was ACKed |
| func (conn *Connection) Synced(typeUrl string) (bool, bool) { |
| conn.proxy.RLock() |
| defer conn.proxy.RUnlock() |
| acked := conn.proxy.WatchedResources[typeUrl].NonceAcked |
| sent := conn.proxy.WatchedResources[typeUrl].NonceSent |
| nacked := conn.proxy.WatchedResources[typeUrl].NonceNacked != "" |
| sendTime := conn.proxy.WatchedResources[typeUrl].LastSent |
| return nacked || acked == sent, time.Since(sendTime) > features.FlowControlTimeout |
| } |
| |
| // nolint |
| func (conn *Connection) NonceAcked(typeUrl string) string { |
| conn.proxy.RLock() |
| defer conn.proxy.RUnlock() |
| if conn.proxy.WatchedResources != nil && conn.proxy.WatchedResources[typeUrl] != nil { |
| return conn.proxy.WatchedResources[typeUrl].NonceAcked |
| } |
| return "" |
| } |
| |
| // nolint |
| func (conn *Connection) NonceSent(typeUrl string) string { |
| conn.proxy.RLock() |
| defer conn.proxy.RUnlock() |
| if conn.proxy.WatchedResources != nil && conn.proxy.WatchedResources[typeUrl] != nil { |
| return conn.proxy.WatchedResources[typeUrl].NonceSent |
| } |
| return "" |
| } |
| |
| func (conn *Connection) Clusters() []string { |
| conn.proxy.RLock() |
| defer conn.proxy.RUnlock() |
| if conn.proxy.WatchedResources != nil && conn.proxy.WatchedResources[v3.EndpointType] != nil { |
| return conn.proxy.WatchedResources[v3.EndpointType].ResourceNames |
| } |
| return []string{} |
| } |
| |
| func (conn *Connection) Routes() []string { |
| conn.proxy.RLock() |
| defer conn.proxy.RUnlock() |
| if conn.proxy.WatchedResources != nil && conn.proxy.WatchedResources[v3.RouteType] != nil { |
| return conn.proxy.WatchedResources[v3.RouteType].ResourceNames |
| } |
| return []string{} |
| } |
| |
| // nolint |
| func (conn *Connection) Watching(typeUrl string) bool { |
| conn.proxy.RLock() |
| defer conn.proxy.RUnlock() |
| if conn.proxy.WatchedResources != nil && conn.proxy.WatchedResources[typeUrl] != nil { |
| return true |
| } |
| return false |
| } |
| |
| // nolint |
| func (conn *Connection) Watched(typeUrl string) *model.WatchedResource { |
| conn.proxy.RLock() |
| defer conn.proxy.RUnlock() |
| if conn.proxy.WatchedResources != nil && conn.proxy.WatchedResources[typeUrl] != nil { |
| return conn.proxy.WatchedResources[typeUrl] |
| } |
| return nil |
| } |
| |
| // pushDetails returns the details needed for current push. It returns ordered list of |
| // watched resources for the proxy, ordered in accordance with known push order. |
| // It also returns the lis of typeUrls. |
| // nolint |
| func (conn *Connection) pushDetails() ([]*model.WatchedResource, sets.Set) { |
| conn.proxy.RLock() |
| defer conn.proxy.RUnlock() |
| typeUrls := sets.New() |
| for k := range conn.proxy.WatchedResources { |
| typeUrls.Insert(k) |
| } |
| return orderWatchedResources(conn.proxy.WatchedResources), typeUrls |
| } |
| |
| func orderWatchedResources(resources map[string]*model.WatchedResource) []*model.WatchedResource { |
| wr := make([]*model.WatchedResource, 0, len(resources)) |
| // first add all known types, in order |
| for _, tp := range PushOrder { |
| if w, f := resources[tp]; f { |
| wr = append(wr, w) |
| } |
| } |
| // Then add any undeclared types |
| for tp, w := range resources { |
| if _, f := KnownOrderedTypeUrls[tp]; !f { |
| wr = append(wr, w) |
| } |
| } |
| return wr |
| } |
| |
| func (conn *Connection) Stop() { |
| close(conn.stop) |
| } |