blob: b80a6b5839fd6d89071abec199d55aa85c210637 [file] [log] [blame]
// Copyright Istio Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package xds
import (
import (
discovery ""
import (
v3 ""
var (
versionMutex sync.RWMutex
// version is the timestamp of the last registry event.
version = "0"
// versionNum counts versions
versionNum = atomic.NewUint64(0)
periodicRefreshMetrics = 10 * time.Second
type debounceOptions struct {
// debounceAfter is the delay added to events to wait
// after a registry/config event for debouncing.
// This will delay the push by at least this interval, plus
// the time getting subsequent events. If no change is
// detected the push will happen, otherwise we'll keep
// delaying until things settle.
debounceAfter time.Duration
// debounceMax is the maximum time to wait for events
// while debouncing. Defaults to 10 seconds. If events keep
// showing up with no break for this time, we'll trigger a push.
debounceMax time.Duration
// enableEDSDebounce indicates whether EDS pushes should be debounced.
enableEDSDebounce bool
// DiscoveryServer is Pilot's gRPC implementation for Envoy's xds APIs
type DiscoveryServer struct {
// Env is the model environment.
Env *model.Environment
// MemRegistry is used for debug and load testing, allow adding services. Visible for testing.
MemRegistry *memory.ServiceDiscovery
// ConfigGenerator is responsible for generating data plane configuration using Istio networking
// APIs and service registry info
ConfigGenerator core.ConfigGenerator
// Generators allow customizing the generated config, based on the client metadata.
// Key is the generator type - will match the Generator metadata to set the per-connection
// default generator, or the combination of Generator metadata and TypeUrl to select a
// different generator for a type.
// Normal istio clients use the default generator - will not be impacted by this.
Generators map[string]model.XdsResourceGenerator
// ProxyNeedsPush is a function that determines whether a push can be completely skipped. Individual generators
// may also choose to not send any updates.
ProxyNeedsPush func(proxy *model.Proxy, req *model.PushRequest) bool
// concurrentPushLimit is a semaphore that limits the amount of concurrent XDS pushes.
concurrentPushLimit chan struct{}
// requestRateLimit limits the number of new XDS requests allowed. This helps prevent thundering hurd of incoming requests.
requestRateLimit *rate.Limiter
// InboundUpdates describes the number of configuration updates the discovery server has received
InboundUpdates *atomic.Int64
// CommittedUpdates describes the number of configuration updates the discovery server has
// received, process, and stored in the push context. If this number is less than InboundUpdates,
// there are updates we have not yet processed.
// Note: This does not mean that all proxies have received these configurations; it is strictly
// the push context, which means that the next push to a proxy will receive this configuration.
CommittedUpdates *atomic.Int64
// EndpointShards for a service. This is a global (per-server) list, built from
// incremental updates. This is keyed by service and namespace
EndpointIndex *model.EndpointIndex
// pushChannel is the buffer used for debouncing.
// after debouncing the pushRequest will be sent to pushQueue
pushChannel chan *model.PushRequest
// mutex used for protecting Environment.PushContext
updateMutex sync.RWMutex
// pushQueue is the buffer that used after debounce and before the real xds push.
pushQueue *PushQueue
// debugHandlers is the list of all the supported debug handlers.
debugHandlers map[string]string
// adsClients reflect active gRPC channels, for both ADS and EDS.
adsClients map[string]*Connection
adsClientsMutex sync.RWMutex
StatusReporter DistributionStatusCache
// Authenticators for XDS requests. Should be same/subset of the CA authenticators.
Authenticators []security.Authenticator
// StatusGen is notified of connect/disconnect/nack on all connections
StatusGen *StatusGen
WorkloadEntryController *workloadentry.Controller
// serverReady indicates caches have been synced up and server is ready to process requests.
serverReady atomic.Bool
debounceOptions debounceOptions
instanceID string
// Cache for XDS resources
Cache model.XdsCache
// JwtKeyResolver holds a reference to the JWT key resolver instance.
JwtKeyResolver *model.JwksResolver
// ListRemoteClusters collects debug information about other clusters this istiod reads from.
ListRemoteClusters func() []cluster.DebugInfo
// ClusterAliases are aliase names for cluster. When a proxy connects with a cluster ID
// and if it has a different alias we should use that a cluster ID for proxy.
ClusterAliases map[cluster.ID]cluster.ID
// NewDiscoveryServer creates DiscoveryServer that sources data from Pilot's internal mesh data structures
func NewDiscoveryServer(env *model.Environment, instanceID string, clusterAliases map[string]string) *DiscoveryServer {
out := &DiscoveryServer{
Env: env,
Generators: map[string]model.XdsResourceGenerator{},
ProxyNeedsPush: DefaultProxyNeedsPush,
EndpointIndex: model.NewEndpointIndex(),
concurrentPushLimit: make(chan struct{}, features.PushThrottle),
requestRateLimit: rate.NewLimiter(rate.Limit(features.RequestLimit), 1),
InboundUpdates: atomic.NewInt64(0),
CommittedUpdates: atomic.NewInt64(0),
pushChannel: make(chan *model.PushRequest, 10),
pushQueue: NewPushQueue(),
debugHandlers: map[string]string{},
adsClients: map[string]*Connection{},
debounceOptions: debounceOptions{
debounceAfter: features.DebounceAfter,
debounceMax: features.DebounceMax,
enableEDSDebounce: features.EnableEDSDebounce,
Cache: model.DisabledCache{},
instanceID: instanceID,
out.ClusterAliases = make(map[cluster.ID]cluster.ID)
for alias := range clusterAliases {
out.ClusterAliases[cluster.ID(alias)] = cluster.ID(clusterAliases[alias])
if features.EnableXDSCaching {
out.Cache = model.NewXdsCache()
// clear the cache as endpoint shards are modified to avoid cache write race
out.ConfigGenerator = core.NewConfigGenerator(out.Cache)
return out
// initJwkResolver initializes the JWT key resolver to be used.
func (s *DiscoveryServer) initJwksResolver() {
if s.JwtKeyResolver != nil {
s.JwtKeyResolver = model.NewJwksResolver(
model.JwtPubKeyEvictionDuration, model.JwtPubKeyRefreshInterval,
model.JwtPubKeyRefreshIntervalOnFailure, model.JwtPubKeyRetryInterval)
// Flush cached discovery responses when detecting jwt public key change.
s.JwtKeyResolver.PushFunc = func() {
s.ConfigUpdate(&model.PushRequest{Full: true, Reason: []model.TriggerReason{model.UnknownTrigger}})
// closeJwksResolver shuts down the JWT key resolver used.
func (s *DiscoveryServer) closeJwksResolver() {
if s.JwtKeyResolver != nil {
s.JwtKeyResolver = nil
// Register adds the ADS handler to the grpc server
func (s *DiscoveryServer) Register(rpcs *grpc.Server) {
// Register v3 server
discovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)
var processStartTime = time.Now()
// CachesSynced is called when caches have been synced so that server can accept connections.
func (s *DiscoveryServer) CachesSynced() {
log.Infof("All caches have been synced up in %v, marking server ready", time.Since(processStartTime))
func (s *DiscoveryServer) IsServerReady() bool {
return s.serverReady.Load()
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
go s.WorkloadEntryController.Run(stopCh)
go s.handleUpdates(stopCh)
go s.periodicRefreshMetrics(stopCh)
go s.sendPushes(stopCh)
func (s *DiscoveryServer) getNonK8sRegistries() []serviceregistry.Instance {
var registries []serviceregistry.Instance
var nonK8sRegistries []serviceregistry.Instance
if agg, ok := s.Env.ServiceDiscovery.(*aggregate.Controller); ok {
registries = agg.GetRegistries()
} else {
registries = []serviceregistry.Instance{
ServiceDiscovery: s.Env.ServiceDiscovery,
for _, registry := range registries {
if registry.Provider() != provider.Kubernetes && registry.Provider() != provider.External {
nonK8sRegistries = append(nonK8sRegistries, registry)
return nonK8sRegistries
// Push metrics are updated periodically (10s default)
func (s *DiscoveryServer) periodicRefreshMetrics(stopCh <-chan struct{}) {
ticker := time.NewTicker(periodicRefreshMetrics)
defer ticker.Stop()
for {
select {
case <-ticker.C:
push := s.globalPushContext()
if model.LastPushStatus != push {
model.LastPushStatus = push
out, _ := model.LastPushStatus.StatusJSON()
if string(out) != "{}" {
log.Infof("Push Status: %s", string(out))
case <-stopCh:
// dropCacheForRequest clears the cache in response to a push request
func (s *DiscoveryServer) dropCacheForRequest(req *model.PushRequest) {
// If we don't know what updated, cannot safely cache. Clear the whole cache
if len(req.ConfigsUpdated) == 0 {
} else {
// Otherwise, just clear the updated configs
// Push is called to push changes on config updates using ADS. This is set in DiscoveryService.Push,
// to avoid direct dependencies.
func (s *DiscoveryServer) Push(req *model.PushRequest) {
if !req.Full {
req.Push = s.globalPushContext()
s.AdsPushAll(versionInfo(), req)
// Reset the status during the push.
oldPushContext := s.globalPushContext()
if oldPushContext != nil {
// Push the previous push Envoy metrics.
// PushContext is reset after a config change. Previous status is
// saved.
t0 := time.Now()
versionLocal := time.Now().Format(time.RFC3339) + "/" + strconv.FormatUint(versionNum.Inc(), 10)
push, err := s.initPushContext(req, oldPushContext, versionLocal)
if err != nil {
initContextTime := time.Since(t0)
log.Debugf("InitContext %v for push took %s", versionLocal, initContextTime)
version = versionLocal
req.Push = push
s.AdsPushAll(versionLocal, req)
func nonce(noncePrefix string) string {
return noncePrefix + uuid.New().String()
func versionInfo() string {
defer versionMutex.RUnlock()
return version
// Returns the global push context. This should be used with caution; generally the proxy-specific
// PushContext should be used to get the current state in the context of a single proxy. This should
// only be used for "global" lookups, such as initiating a new push to all proxies.
func (s *DiscoveryServer) globalPushContext() *model.PushContext {
defer s.updateMutex.RUnlock()
return s.Env.PushContext
// ConfigUpdate implements ConfigUpdater interface, used to request pushes.
// It replaces the 'clear cache' from v1.
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) {
s.pushChannel <- req
// Debouncing and push request happens in a separate thread, it uses locks
// and we want to avoid complications, ConfigUpdate may already hold other locks.
// handleUpdates processes events from pushChannel
// It ensures that at minimum minQuiet time has elapsed since the last event before processing it.
// It also ensures that at most maxDelay is elapsed between receiving an event and processing it.
func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {
debounce(s.pushChannel, stopCh, s.debounceOptions, s.Push, s.CommittedUpdates)
// The debounce helper function is implemented to enable mocking
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts debounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64) {
var timeChan <-chan time.Time
var startDebounce time.Time
var lastConfigUpdateTime time.Time
pushCounter := 0
debouncedEvents := 0
// Keeps track of the push requests. If updates are debounce they will be merged.
var req *model.PushRequest
free := true
freeCh := make(chan struct{}, 1)
push := func(req *model.PushRequest, debouncedEvents int) {
freeCh <- struct{}{}
pushWorker := func() {
eventDelay := time.Since(startDebounce)
quietTime := time.Since(lastConfigUpdateTime)
// it has been too long or quiet enough
if eventDelay >= opts.debounceMax || quietTime >= opts.debounceAfter {
if req != nil {
if req.ConfigsUpdated == nil {
log.Infof("Push debounce stable[%d] %d for reason %s: %v since last change, %v since last push, full=%v",
pushCounter, debouncedEvents, reasonsUpdated(req),
quietTime, eventDelay, req.Full)
} else {
log.Infof("Push debounce stable[%d] %d for config %s: %v since last change, %v since last push, full=%v",
pushCounter, debouncedEvents, configsUpdated(req),
quietTime, eventDelay, req.Full)
free = false
go push(req, debouncedEvents)
req = nil
debouncedEvents = 0
} else {
timeChan = time.After(opts.debounceAfter - quietTime)
for {
select {
case <-freeCh:
free = true
case r := <-ch:
// If reason is not set, record it as an unknown reason
if len(r.Reason) == 0 {
r.Reason = []model.TriggerReason{model.UnknownTrigger}
if !opts.enableEDSDebounce && !r.Full {
// trigger push now, just for EDS
go func(req *model.PushRequest) {
lastConfigUpdateTime = time.Now()
if debouncedEvents == 0 {
timeChan = time.After(opts.debounceAfter)
startDebounce = lastConfigUpdateTime
req = req.Merge(r)
case <-timeChan:
if free {
case <-stopCh:
func configsUpdated(req *model.PushRequest) string {
configs := ""
for key := range req.ConfigsUpdated {
configs += key.String()
if len(req.ConfigsUpdated) > 1 {
more := fmt.Sprintf(" and %d more configs", len(req.ConfigsUpdated)-1)
configs += more
return configs
func reasonsUpdated(req *model.PushRequest) string {
switch len(req.Reason) {
case 0:
return "unknown"
case 1:
return string(req.Reason[0])
return fmt.Sprintf("%s and %d more reasons", req.Reason[0], len(req.Reason)-1)
func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {
for {
select {
case <-stopCh:
// We can send to it until it is full, then it will block until a pushes finishes and reads from it.
// This limits the number of pushes that can happen concurrently
semaphore <- struct{}{}
// Get the next proxy to push. This will block if there are no updates required.
client, push, shuttingdown := queue.Dequeue()
if shuttingdown {
// Signals that a push is done by reading from the semaphore, allowing another send on it.
doneFunc := func() {
var closed <-chan struct{}
if != nil {
closed =
} else {
closed = client.deltaStream.Context().Done()
go func() {
pushEv := &Event{
pushRequest: push,
done: doneFunc,
select {
case client.pushChannel <- pushEv:
case <-closed: // grpc stream was closed
log.Infof("Client closed connection %v", client.conID)
// initPushContext creates a global push context and stores it on the environment. Note: while this
// method is technically thread safe (there are no data races), it should not be called in parallel;
// if it is, then we may start two push context creations (say A, and B), but then write them in
// reverse order, leaving us with a final version of A, which may be incomplete.
func (s *DiscoveryServer) initPushContext(req *model.PushRequest, oldPushContext *model.PushContext, version string) (*model.PushContext, error) {
push := model.NewPushContext()
push.PushVersion = version
push.JwtKeyResolver = s.JwtKeyResolver
if err := push.InitContext(s.Env, oldPushContext, req); err != nil {
log.Errorf("XDS: failed to init push context: %v", err)
// We can't push if we can't read the data - stick with previous version.
return nil, err
if err := s.UpdateServiceShards(push); err != nil {
return nil, err
s.Env.PushContext = push
// Ensure we drop the cache in the lock to avoid races, where we drop the cache, fill it back up, then update push context
return push, nil
func (s *DiscoveryServer) sendPushes(stopCh <-chan struct{}) {
doSendPushes(stopCh, s.concurrentPushLimit, s.pushQueue)
// InitGenerators initializes generators to be used by XdsServer.
func (s *DiscoveryServer) InitGenerators(env *model.Environment, systemNameSpace string) {
edsGen := &EdsGenerator{Server: s}
s.StatusGen = NewStatusGen(s)
s.Generators[v3.ClusterType] = &CdsGenerator{Server: s}
s.Generators[v3.ListenerType] = &LdsGenerator{Server: s}
s.Generators[v3.RouteType] = &RdsGenerator{Server: s}
s.Generators[v3.EndpointType] = edsGen
s.Generators[v3.NameTableType] = &NdsGenerator{Server: s}
s.Generators[v3.ExtensionConfigurationType] = &EcdsGenerator{Server: s}
s.Generators[v3.ProxyConfigType] = &PcdsGenerator{Server: s, TrustBundle: env.TrustBundle}
s.Generators["grpc"] = &grpcgen.GrpcConfigGenerator{}
s.Generators["grpc/"+v3.EndpointType] = edsGen
s.Generators["grpc/"+v3.ListenerType] = s.Generators["grpc"]
s.Generators["grpc/"+v3.RouteType] = s.Generators["grpc"]
s.Generators["grpc/"+v3.ClusterType] = s.Generators["grpc"]
s.Generators["dubbo"] = &dubbogen.DubboConfigGenerator{}
s.Generators[v3.DubboServiceNameMappingType] = s.Generators["dubbo"]
s.Generators["api"] = apigen.NewGenerator(env.ConfigStore)
s.Generators["api/"+v3.EndpointType] = edsGen
s.Generators["api/"+TypeURLConnect] = s.StatusGen
s.Generators["event"] = s.StatusGen
s.Generators[TypeDebug] = NewDebugGen(s, systemNameSpace)
s.Generators[v3.BootstrapType] = &BootstrapGenerator{Server: s}
// shutdown shuts down DiscoveryServer components.
func (s *DiscoveryServer) Shutdown() {
// Clients returns all currently connected clients. This method can be safely called concurrently,
// but care should be taken with the underlying objects (ie model.Proxy) to ensure proper locking.
// This method returns only fully initialized connections; for all connections, use AllClients
func (s *DiscoveryServer) Clients() []*Connection {
defer s.adsClientsMutex.RUnlock()
clients := make([]*Connection, 0, len(s.adsClients))
for _, con := range s.adsClients {
select {
case <-con.initialized:
// Initialization not complete, skip
clients = append(clients, con)
return clients
// AllClients returns all connected clients, per Clients, but additionally includes unintialized connections
// Warning: callers must take care not to rely on the con.proxy field being set
func (s *DiscoveryServer) AllClients() []*Connection {
defer s.adsClientsMutex.RUnlock()
clients := make([]*Connection, 0, len(s.adsClients))
for _, con := range s.adsClients {
clients = append(clients, con)
return clients
// SendResponse will immediately send the response to all connections.
// TODO: additional filters can be added, for example namespace.
func (s *DiscoveryServer) SendResponse(connections []*Connection, res *discovery.DiscoveryResponse) {
for _, p := range connections {
// p.send() waits for an ACK - which is reasonable for normal push,
// but in this case we want to sync fast and not bother with stuck connections.
// This is expecting a relatively small number of watchers - each other istiod
// plus few admin tools or bridges to real message brokers. The normal
// push expects 1000s of envoy connections.
con := p
go func() {
err :=
if err != nil {
log.Errorf("Failed to send internal event %s: %v", con.conID, err)
// nolint
// ClientsOf returns the clients that are watching the given resource.
func (s *DiscoveryServer) ClientsOf(typeUrl string) []*Connection {
pending := []*Connection{}
for _, v := range s.Clients() {
if v.Watching(typeUrl) {
pending = append(pending, v)
return pending
func (s *DiscoveryServer) WaitForRequestLimit(ctx context.Context) error {
if s.requestRateLimit.Limit() == 0 {
// Allow opt out when rate limiting is set to 0qps
return nil
// Give a bit of time for queue to clear out, but if not fail fast. Client will connect to another
// instance in best case, or retry with backoff.
wait, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
return s.requestRateLimit.Wait(wait)