blob: bf5e126b43fba7e89f6a454c65afc941cb29137b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xds
import (
"strings"
"time"
dubbogrpc "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/grpc"
"github.com/apache/dubbo-kubernetes/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
)
type (
DiscoveryStream = discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer
Resources = []*discovery.Resource
)
type ConnectionContext interface {
XdsConnection() *Connection
Watcher() Watcher
Initialize(node *core.Node) error
Close()
Process(req *discovery.DiscoveryRequest) error
Push(ev any) error
}
type Watcher interface {
DeleteWatchedResource(url string)
GetWatchedResource(url string) *WatchedResource
NewWatchedResource(url string, names []string)
UpdateWatchedResource(string, func(*WatchedResource) *WatchedResource)
GetID() string
}
type WatchedResource struct {
TypeUrl string
ResourceNames sets.String
Wildcard bool
NonceSent string
NonceAcked string
AlwaysRespond bool
LastSendTime time.Time
LastError string
LastResources Resources
}
type ResourceDelta struct {
// Subscribed indicates the client requested these additional resources
Subscribed sets.String
// Unsubscribed indicates the client no longer requires these resources
Unsubscribed sets.String
}
var emptyResourceDelta = ResourceDelta{}
func (rd ResourceDelta) IsEmpty() bool {
return len(rd.Subscribed) == 0 && len(rd.Unsubscribed) == 0
}
type Connection struct {
peerAddr string
connectedAt time.Time
conID string
pushChannel chan any
stream DiscoveryStream
initialized chan struct{}
stop chan struct{}
reqChan chan *discovery.DiscoveryRequest
errorChan chan error
}
func NewConnection(peerAddr string, stream DiscoveryStream) Connection {
return Connection{
pushChannel: make(chan any),
initialized: make(chan struct{}),
stop: make(chan struct{}),
reqChan: make(chan *discovery.DiscoveryRequest, 1),
errorChan: make(chan error, 1),
peerAddr: peerAddr,
connectedAt: time.Now(),
stream: stream,
}
}
func Stream(ctx ConnectionContext) error {
con := ctx.XdsConnection()
go Receive(ctx)
<-con.initialized
for {
// Go select{} statements are not ordered; the same channel can be chosen many times.
// For requests, these are higher priority (client may be blocked on startup until these are done)
// and often very cheap to handle (simple ACK), so we check it first.
select {
case req, ok := <-con.reqChan:
if ok {
if err := ctx.Process(req); err != nil {
return err
}
} else {
// Remote side closed connection or error processing the request.
return <-con.errorChan
}
case <-con.stop:
return nil
default:
}
// If there wasn't already a request, poll for requests and pushes. Note: if we have a huge
// amount of incoming requests, we may still send some pushes, as we do not `continue` above;
// however, requests will be handled ~2x as much as pushes. This ensures a wave of requests
// cannot completely starve pushes. However, this scenario is unlikely.
select {
case req, ok := <-con.reqChan:
if ok {
if err := ctx.Process(req); err != nil {
return err
}
} else {
return <-con.errorChan
}
case pushEv := <-con.pushChannel:
err := ctx.Push(pushEv)
if err != nil {
return err
}
case <-con.stop:
return nil
}
}
}
func Receive(ctx ConnectionContext) {
con := ctx.XdsConnection()
defer func() {
close(con.errorChan)
close(con.reqChan)
select {
case <-con.initialized:
default:
close(con.initialized)
}
}()
firstRequest := true
for {
req, err := con.stream.Recv()
if err != nil {
if dubbogrpc.GRPCErrorType(err) != dubbogrpc.UnexpectedError {
log.Infof("%q %s terminated", con.peerAddr, con.conID)
return
}
con.errorChan <- err
log.Errorf("%q %s terminated with error: %v", con.peerAddr, con.conID, err)
return
}
if firstRequest {
if req.TypeUrl == model.HealthInfoType {
log.Warnf("%q %s send health check probe before normal xDS request", con.peerAddr, con.conID)
continue
}
firstRequest = false
if req.Node == nil || req.Node.Id == "" {
con.errorChan <- status.New(codes.InvalidArgument, "missing node information").Err()
return
}
if err := ctx.Initialize(req.Node); err != nil {
con.errorChan <- err
return
}
defer ctx.Close()
// Connection logged in initConnection() after addCon() to ensure accurate counting
}
resourceNamesStr := " [wildcard]"
if len(req.ResourceNames) > 0 {
resourceNamesStr = " [" + strings.Join(req.ResourceNames, ", ") + "]"
}
log.Infof("%s: RAW REQ %s resources:%d nonce:%s%s",
model.GetShortType(req.TypeUrl), con.conID, len(req.ResourceNames), req.ResponseNonce, resourceNamesStr)
select {
case con.reqChan <- req:
case <-con.stream.Context().Done():
log.Infof("%q %s terminated with stream closed", con.peerAddr, con.conID)
return
}
}
}
func Send(ctx ConnectionContext, res *discovery.DiscoveryResponse) error {
conn := ctx.XdsConnection()
sendResponse := func() error {
return conn.stream.Send(res)
}
err := sendResponse()
if err == nil {
if res.Nonce != "" && !strings.HasPrefix(res.TypeUrl, model.DebugType) {
ctx.Watcher().UpdateWatchedResource(res.TypeUrl, func(wr *WatchedResource) *WatchedResource {
if wr == nil {
wr = &WatchedResource{TypeUrl: res.TypeUrl}
}
wr.NonceSent = res.Nonce
wr.LastSendTime = time.Now()
return wr
})
}
} else if status.Convert(err).Code() == codes.DeadlineExceeded {
log.Infof("Timeout writing %s: %v", conn.conID, model.GetShortType(res.TypeUrl))
}
return err
}
func (conn *Connection) ID() string {
return conn.conID
}
func (conn *Connection) SetID(id string) {
conn.conID = id
}
func (conn *Connection) Peer() string {
return conn.peerAddr
}
func (conn *Connection) MarkInitialized() {
close(conn.initialized)
}
func (conn *Connection) InitializedCh() chan struct{} {
return conn.initialized
}
func (conn *Connection) ErrorCh() chan error {
return conn.errorChan
}
func (conn *Connection) PushCh() chan any {
return conn.pushChannel
}
func (conn *Connection) StopCh() chan struct{} {
return conn.stop
}
func (conn *Connection) StreamDone() <-chan struct{} {
return conn.stream.Context().Done()
}
func ShouldRespond(w Watcher, id string, request *discovery.DiscoveryRequest) (bool, ResourceDelta) {
stype := model.GetShortType(request.TypeUrl)
if request.ErrorDetail != nil {
errCode := codes.Code(request.ErrorDetail.Code)
log.Warnf("%s: ACK ERROR %s %s:%s", stype, id, errCode.String(), request.ErrorDetail.GetMessage())
w.UpdateWatchedResource(request.TypeUrl, func(wr *WatchedResource) *WatchedResource {
wr.LastError = request.ErrorDetail.GetMessage()
return wr
})
return false, emptyResourceDelta
}
if shouldUnsubscribe(request) {
log.Debugf("%s: UNSUBSCRIBE %s %s %s", stype, id, request.VersionInfo, request.ResponseNonce)
w.DeleteWatchedResource(request.TypeUrl)
return false, emptyResourceDelta
}
previousInfo := w.GetWatchedResource(request.TypeUrl)
// This can happen in two cases:
// 1. When Envoy starts for the first time, it sends an initial Discovery request to Istiod.
// 2. When Envoy reconnects to a new Istiod that does not have information about this typeUrl
// i.e. non empty response nonce.
// We should always respond with the current resource names.
if previousInfo == nil {
// No previous info - this is a new request
if request.ResponseNonce == "" {
// Initial request with no nonce
log.Debugf("%s: INIT %s %s", stype, id, request.VersionInfo)
w.NewWatchedResource(request.TypeUrl, request.ResourceNames)
return true, emptyResourceDelta
}
// Client sent a nonce but we have no previous info
// This could be a reconnect, but if we're getting many requests with different nonces,
// they're likely stale/expired requests that should be ignored
// Only treat as reconnect if this is the first request we see with a nonce
// For subsequent requests with nonces we don't recognize, treat as expired
log.Debugf("%s: REQ %s Unknown nonce (no previous info): %s, treating as expired/stale", stype, id, request.ResponseNonce)
// Create the watched resource but don't respond - let the client retry with empty nonce
w.NewWatchedResource(request.TypeUrl, request.ResourceNames)
return false, emptyResourceDelta
}
// We have previous info - check nonce match
if request.ResponseNonce == "" {
// Client sent empty nonce but we have previous info - this is a new request
log.Debugf("%s: INIT (empty nonce) %s %s", stype, id, request.VersionInfo)
w.NewWatchedResource(request.TypeUrl, request.ResourceNames)
return true, emptyResourceDelta
}
// If there is mismatch in the nonce, that is a case of expired/stale nonce.
// A nonce becomes stale following a newer nonce being sent to Envoy.
// previousInfo.NonceSent can be empty if we previously had shouldRespond=true but didn't send any resources.
if request.ResponseNonce != previousInfo.NonceSent {
newResources := sets.New(request.ResourceNames...)
// Special-case proxyless gRPC: Envoy will send a "stale" nonce when it changes
// subscriptions (e.g., after ServiceRoute introduces subset clusters). Treat this
// as a resource change rather than an ACK so the new clusters get a response.
previousResourcesCopy := previousInfo.ResourceNames.Copy()
if !newResources.Equals(previousResourcesCopy) && len(newResources) > 0 {
log.Infof("%s: REQ %s nonce mismatch (got %s, sent %s) but resources changed -> responding",
stype, id, request.ResponseNonce, previousInfo.NonceSent)
added := newResources.Difference(previousResourcesCopy)
w.UpdateWatchedResource(request.TypeUrl, func(wr *WatchedResource) *WatchedResource {
if wr == nil {
return nil
}
wr.LastError = ""
wr.ResourceNames = newResources
// keep previous nonce so the subsequent ACK can match
return wr
})
if len(added) == 0 {
// Still respond to make sure client receives an update even if map difference logic
// thinks nothing was added (e.g., only removal happened).
return true, ResourceDelta{Subscribed: added}
}
return true, ResourceDelta{Subscribed: added}
}
// Expired/stale nonce - don't respond, just log at debug level
if previousInfo.NonceSent == "" {
// We never sent a nonce, but client sent one - this is unusual but treat as expired
log.Debugf("%s: REQ %s Expired nonce received %s, but we never sent any nonce", stype,
id, request.ResponseNonce)
} else {
// Normal case: client sent stale nonce
log.Debugf("%s: REQ %s Expired nonce received %s, sent %s", stype,
id, request.ResponseNonce, previousInfo.NonceSent)
}
return false, emptyResourceDelta
}
// If it comes here, that means nonce match.
var previousResources sets.String
var cur sets.String
var alwaysRespond bool
w.UpdateWatchedResource(request.TypeUrl, func(wr *WatchedResource) *WatchedResource {
// Clear last error, we got an ACK.
wr.LastError = ""
previousResources = wr.ResourceNames
wr.NonceAcked = request.ResponseNonce
wr.ResourceNames = sets.New(request.ResourceNames...)
cur = wr.ResourceNames
alwaysRespond = wr.AlwaysRespond
wr.AlwaysRespond = false
return wr
})
// Envoy can send two DiscoveryRequests with same version and nonce.
// when it detects a new resource. We should respond if they change.
removed := previousResources.Difference(cur)
added := cur.Difference(previousResources)
// CRITICAL FIX: For proxyless gRPC, if client sends wildcard (empty ResourceNames) after receiving specific resources,
// this is likely an ACK and we should NOT push all resources again
// Check if this is a wildcard request after specific resources were sent
if len(request.ResourceNames) == 0 && len(previousResources) > 0 && previousInfo.NonceSent != "" {
// This is a wildcard request after specific resources were sent
// For proxyless gRPC clients, this should be treated as ACK, not a request for all resources
// The client already has the resources from the previous push
// Check if this is proxyless by attempting to get the proxy from watcher
// For now, we'll check if this is a proxyless scenario by the context
// If previous resources existed and client sends empty names with matching nonce, it's an ACK
log.Debugf("%s: wildcard request after specific resources (prev: %d resources, nonce: %s), treating as ACK",
stype, len(previousResources), previousInfo.NonceSent)
// Update ResourceNames to keep previous resources (don't clear them)
w.UpdateWatchedResource(request.TypeUrl, func(wr *WatchedResource) *WatchedResource {
if wr == nil {
return nil
}
// Keep the previous ResourceNames, don't clear them with empty set
// The client is ACKing the previous push, not requesting all resources
wr.ResourceNames = previousResources
return wr
})
return false, emptyResourceDelta
}
// We should always respond "alwaysRespond" marked requests to let Envoy finish warming
// even though Nonce match and it looks like an ACK.
if alwaysRespond {
log.Infof("%s: FORCE RESPONSE %s for warming.", stype, id)
return true, emptyResourceDelta
}
if len(removed) == 0 && len(added) == 0 {
log.Debugf("%s: ACK %s %s %s", stype, id, request.VersionInfo, request.ResponseNonce)
return false, emptyResourceDelta
}
log.Debugf("%s: RESOURCE CHANGE added %v removed %v %s %s %s", stype,
added, removed, id, request.VersionInfo, request.ResponseNonce)
// For non wildcard resource, if no new resources are subscribed, it means we do not need to push.
if !IsWildcardTypeURL(request.TypeUrl) && len(added) == 0 {
return false, emptyResourceDelta
}
return true, ResourceDelta{
Subscribed: added,
// we do not need to set unsubscribed for StoW
}
}
func shouldUnsubscribe(request *discovery.DiscoveryRequest) bool {
return len(request.ResourceNames) == 0 && !IsWildcardTypeURL(request.TypeUrl)
}
func IsWildcardTypeURL(typeURL string) bool {
switch typeURL {
case model.SecretType, model.EndpointType, model.RouteType, model.ExtensionConfigurationType:
// By XDS spec, these are not wildcard
return false
case model.ClusterType, model.ListenerType:
// By XDS spec, these are wildcard
return true
default:
// All of our internal types use wildcard semantics
return true
}
}
func ResourcesToAny(r Resources) []*anypb.Any {
a := make([]*anypb.Any, 0, len(r))
for _, rr := range r {
a = append(a, rr.Resource)
}
return a
}