blob: b80a6b5839fd6d89071abec199d55aa85c210637 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xds
import (
"context"
"fmt"
"strconv"
"sync"
"time"
)
import (
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/google/uuid"
"go.uber.org/atomic"
"golang.org/x/time/rate"
"google.golang.org/grpc"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/controller/workloadentry"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/apigen"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/core"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/core/v1alpha3/envoyfilter"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/dubbogen"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/networking/grpcgen"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/aggregate"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/memory"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider"
v3 "github.com/apache/dubbo-go-pixiu/pilot/pkg/xds/v3"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/security"
)
var (
versionMutex sync.RWMutex
// version is the timestamp of the last registry event.
version = "0"
// versionNum counts versions
versionNum = atomic.NewUint64(0)
periodicRefreshMetrics = 10 * time.Second
)
type debounceOptions struct {
// debounceAfter is the delay added to events to wait
// after a registry/config event for debouncing.
// This will delay the push by at least this interval, plus
// the time getting subsequent events. If no change is
// detected the push will happen, otherwise we'll keep
// delaying until things settle.
debounceAfter time.Duration
// debounceMax is the maximum time to wait for events
// while debouncing. Defaults to 10 seconds. If events keep
// showing up with no break for this time, we'll trigger a push.
debounceMax time.Duration
// enableEDSDebounce indicates whether EDS pushes should be debounced.
enableEDSDebounce bool
}
// DiscoveryServer is Pilot's gRPC implementation for Envoy's xds APIs
type DiscoveryServer struct {
// Env is the model environment.
Env *model.Environment
// MemRegistry is used for debug and load testing, allow adding services. Visible for testing.
MemRegistry *memory.ServiceDiscovery
// ConfigGenerator is responsible for generating data plane configuration using Istio networking
// APIs and service registry info
ConfigGenerator core.ConfigGenerator
// Generators allow customizing the generated config, based on the client metadata.
// Key is the generator type - will match the Generator metadata to set the per-connection
// default generator, or the combination of Generator metadata and TypeUrl to select a
// different generator for a type.
// Normal istio clients use the default generator - will not be impacted by this.
Generators map[string]model.XdsResourceGenerator
// ProxyNeedsPush is a function that determines whether a push can be completely skipped. Individual generators
// may also choose to not send any updates.
ProxyNeedsPush func(proxy *model.Proxy, req *model.PushRequest) bool
// concurrentPushLimit is a semaphore that limits the amount of concurrent XDS pushes.
concurrentPushLimit chan struct{}
// requestRateLimit limits the number of new XDS requests allowed. This helps prevent thundering hurd of incoming requests.
requestRateLimit *rate.Limiter
// InboundUpdates describes the number of configuration updates the discovery server has received
InboundUpdates *atomic.Int64
// CommittedUpdates describes the number of configuration updates the discovery server has
// received, process, and stored in the push context. If this number is less than InboundUpdates,
// there are updates we have not yet processed.
// Note: This does not mean that all proxies have received these configurations; it is strictly
// the push context, which means that the next push to a proxy will receive this configuration.
CommittedUpdates *atomic.Int64
// EndpointShards for a service. This is a global (per-server) list, built from
// incremental updates. This is keyed by service and namespace
EndpointIndex *model.EndpointIndex
// pushChannel is the buffer used for debouncing.
// after debouncing the pushRequest will be sent to pushQueue
pushChannel chan *model.PushRequest
// mutex used for protecting Environment.PushContext
updateMutex sync.RWMutex
// pushQueue is the buffer that used after debounce and before the real xds push.
pushQueue *PushQueue
// debugHandlers is the list of all the supported debug handlers.
debugHandlers map[string]string
// adsClients reflect active gRPC channels, for both ADS and EDS.
adsClients map[string]*Connection
adsClientsMutex sync.RWMutex
StatusReporter DistributionStatusCache
// Authenticators for XDS requests. Should be same/subset of the CA authenticators.
Authenticators []security.Authenticator
// StatusGen is notified of connect/disconnect/nack on all connections
StatusGen *StatusGen
WorkloadEntryController *workloadentry.Controller
// serverReady indicates caches have been synced up and server is ready to process requests.
serverReady atomic.Bool
debounceOptions debounceOptions
instanceID string
// Cache for XDS resources
Cache model.XdsCache
// JwtKeyResolver holds a reference to the JWT key resolver instance.
JwtKeyResolver *model.JwksResolver
// ListRemoteClusters collects debug information about other clusters this istiod reads from.
ListRemoteClusters func() []cluster.DebugInfo
// ClusterAliases are aliase names for cluster. When a proxy connects with a cluster ID
// and if it has a different alias we should use that a cluster ID for proxy.
ClusterAliases map[cluster.ID]cluster.ID
}
// NewDiscoveryServer creates DiscoveryServer that sources data from Pilot's internal mesh data structures
func NewDiscoveryServer(env *model.Environment, instanceID string, clusterAliases map[string]string) *DiscoveryServer {
out := &DiscoveryServer{
Env: env,
Generators: map[string]model.XdsResourceGenerator{},
ProxyNeedsPush: DefaultProxyNeedsPush,
EndpointIndex: model.NewEndpointIndex(),
concurrentPushLimit: make(chan struct{}, features.PushThrottle),
requestRateLimit: rate.NewLimiter(rate.Limit(features.RequestLimit), 1),
InboundUpdates: atomic.NewInt64(0),
CommittedUpdates: atomic.NewInt64(0),
pushChannel: make(chan *model.PushRequest, 10),
pushQueue: NewPushQueue(),
debugHandlers: map[string]string{},
adsClients: map[string]*Connection{},
debounceOptions: debounceOptions{
debounceAfter: features.DebounceAfter,
debounceMax: features.DebounceMax,
enableEDSDebounce: features.EnableEDSDebounce,
},
Cache: model.DisabledCache{},
instanceID: instanceID,
}
out.ClusterAliases = make(map[cluster.ID]cluster.ID)
for alias := range clusterAliases {
out.ClusterAliases[cluster.ID(alias)] = cluster.ID(clusterAliases[alias])
}
out.initJwksResolver()
if features.EnableXDSCaching {
out.Cache = model.NewXdsCache()
// clear the cache as endpoint shards are modified to avoid cache write race
out.EndpointIndex.SetCache(out.Cache)
}
out.ConfigGenerator = core.NewConfigGenerator(out.Cache)
return out
}
// initJwkResolver initializes the JWT key resolver to be used.
func (s *DiscoveryServer) initJwksResolver() {
if s.JwtKeyResolver != nil {
s.closeJwksResolver()
}
s.JwtKeyResolver = model.NewJwksResolver(
model.JwtPubKeyEvictionDuration, model.JwtPubKeyRefreshInterval,
model.JwtPubKeyRefreshIntervalOnFailure, model.JwtPubKeyRetryInterval)
// Flush cached discovery responses when detecting jwt public key change.
s.JwtKeyResolver.PushFunc = func() {
s.ConfigUpdate(&model.PushRequest{Full: true, Reason: []model.TriggerReason{model.UnknownTrigger}})
}
}
// closeJwksResolver shuts down the JWT key resolver used.
func (s *DiscoveryServer) closeJwksResolver() {
if s.JwtKeyResolver != nil {
s.JwtKeyResolver.Close()
}
s.JwtKeyResolver = nil
}
// Register adds the ADS handler to the grpc server
func (s *DiscoveryServer) Register(rpcs *grpc.Server) {
// Register v3 server
discovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)
}
var processStartTime = time.Now()
// CachesSynced is called when caches have been synced so that server can accept connections.
func (s *DiscoveryServer) CachesSynced() {
log.Infof("All caches have been synced up in %v, marking server ready", time.Since(processStartTime))
s.serverReady.Store(true)
}
func (s *DiscoveryServer) IsServerReady() bool {
return s.serverReady.Load()
}
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
go s.WorkloadEntryController.Run(stopCh)
go s.handleUpdates(stopCh)
go s.periodicRefreshMetrics(stopCh)
go s.sendPushes(stopCh)
}
func (s *DiscoveryServer) getNonK8sRegistries() []serviceregistry.Instance {
var registries []serviceregistry.Instance
var nonK8sRegistries []serviceregistry.Instance
if agg, ok := s.Env.ServiceDiscovery.(*aggregate.Controller); ok {
registries = agg.GetRegistries()
} else {
registries = []serviceregistry.Instance{
serviceregistry.Simple{
ServiceDiscovery: s.Env.ServiceDiscovery,
},
}
}
for _, registry := range registries {
if registry.Provider() != provider.Kubernetes && registry.Provider() != provider.External {
nonK8sRegistries = append(nonK8sRegistries, registry)
}
}
return nonK8sRegistries
}
// Push metrics are updated periodically (10s default)
func (s *DiscoveryServer) periodicRefreshMetrics(stopCh <-chan struct{}) {
ticker := time.NewTicker(periodicRefreshMetrics)
defer ticker.Stop()
for {
select {
case <-ticker.C:
push := s.globalPushContext()
model.LastPushMutex.Lock()
if model.LastPushStatus != push {
model.LastPushStatus = push
push.UpdateMetrics()
out, _ := model.LastPushStatus.StatusJSON()
if string(out) != "{}" {
log.Infof("Push Status: %s", string(out))
}
}
model.LastPushMutex.Unlock()
case <-stopCh:
return
}
}
}
// dropCacheForRequest clears the cache in response to a push request
func (s *DiscoveryServer) dropCacheForRequest(req *model.PushRequest) {
// If we don't know what updated, cannot safely cache. Clear the whole cache
if len(req.ConfigsUpdated) == 0 {
s.Cache.ClearAll()
} else {
// Otherwise, just clear the updated configs
s.Cache.Clear(req.ConfigsUpdated)
}
}
// Push is called to push changes on config updates using ADS. This is set in DiscoveryService.Push,
// to avoid direct dependencies.
func (s *DiscoveryServer) Push(req *model.PushRequest) {
if !req.Full {
req.Push = s.globalPushContext()
s.dropCacheForRequest(req)
s.AdsPushAll(versionInfo(), req)
return
}
// Reset the status during the push.
oldPushContext := s.globalPushContext()
if oldPushContext != nil {
oldPushContext.OnConfigChange()
// Push the previous push Envoy metrics.
envoyfilter.RecordMetrics()
}
// PushContext is reset after a config change. Previous status is
// saved.
t0 := time.Now()
versionLocal := time.Now().Format(time.RFC3339) + "/" + strconv.FormatUint(versionNum.Inc(), 10)
push, err := s.initPushContext(req, oldPushContext, versionLocal)
if err != nil {
return
}
initContextTime := time.Since(t0)
log.Debugf("InitContext %v for push took %s", versionLocal, initContextTime)
pushContextInitTime.Record(initContextTime.Seconds())
versionMutex.Lock()
version = versionLocal
versionMutex.Unlock()
req.Push = push
s.AdsPushAll(versionLocal, req)
}
func nonce(noncePrefix string) string {
return noncePrefix + uuid.New().String()
}
func versionInfo() string {
versionMutex.RLock()
defer versionMutex.RUnlock()
return version
}
// Returns the global push context. This should be used with caution; generally the proxy-specific
// PushContext should be used to get the current state in the context of a single proxy. This should
// only be used for "global" lookups, such as initiating a new push to all proxies.
func (s *DiscoveryServer) globalPushContext() *model.PushContext {
s.updateMutex.RLock()
defer s.updateMutex.RUnlock()
return s.Env.PushContext
}
// ConfigUpdate implements ConfigUpdater interface, used to request pushes.
// It replaces the 'clear cache' from v1.
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) {
inboundConfigUpdates.Increment()
s.InboundUpdates.Inc()
s.pushChannel <- req
}
// Debouncing and push request happens in a separate thread, it uses locks
// and we want to avoid complications, ConfigUpdate may already hold other locks.
// handleUpdates processes events from pushChannel
// It ensures that at minimum minQuiet time has elapsed since the last event before processing it.
// It also ensures that at most maxDelay is elapsed between receiving an event and processing it.
func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {
debounce(s.pushChannel, stopCh, s.debounceOptions, s.Push, s.CommittedUpdates)
}
// The debounce helper function is implemented to enable mocking
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts debounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64) {
var timeChan <-chan time.Time
var startDebounce time.Time
var lastConfigUpdateTime time.Time
pushCounter := 0
debouncedEvents := 0
// Keeps track of the push requests. If updates are debounce they will be merged.
var req *model.PushRequest
free := true
freeCh := make(chan struct{}, 1)
push := func(req *model.PushRequest, debouncedEvents int) {
pushFn(req)
updateSent.Add(int64(debouncedEvents))
freeCh <- struct{}{}
}
pushWorker := func() {
eventDelay := time.Since(startDebounce)
quietTime := time.Since(lastConfigUpdateTime)
// it has been too long or quiet enough
if eventDelay >= opts.debounceMax || quietTime >= opts.debounceAfter {
if req != nil {
pushCounter++
if req.ConfigsUpdated == nil {
log.Infof("Push debounce stable[%d] %d for reason %s: %v since last change, %v since last push, full=%v",
pushCounter, debouncedEvents, reasonsUpdated(req),
quietTime, eventDelay, req.Full)
} else {
log.Infof("Push debounce stable[%d] %d for config %s: %v since last change, %v since last push, full=%v",
pushCounter, debouncedEvents, configsUpdated(req),
quietTime, eventDelay, req.Full)
}
free = false
go push(req, debouncedEvents)
req = nil
debouncedEvents = 0
}
} else {
timeChan = time.After(opts.debounceAfter - quietTime)
}
}
for {
select {
case <-freeCh:
free = true
pushWorker()
case r := <-ch:
// If reason is not set, record it as an unknown reason
if len(r.Reason) == 0 {
r.Reason = []model.TriggerReason{model.UnknownTrigger}
}
if !opts.enableEDSDebounce && !r.Full {
// trigger push now, just for EDS
go func(req *model.PushRequest) {
pushFn(req)
updateSent.Inc()
}(r)
continue
}
lastConfigUpdateTime = time.Now()
if debouncedEvents == 0 {
timeChan = time.After(opts.debounceAfter)
startDebounce = lastConfigUpdateTime
}
debouncedEvents++
req = req.Merge(r)
case <-timeChan:
if free {
pushWorker()
}
case <-stopCh:
return
}
}
}
func configsUpdated(req *model.PushRequest) string {
configs := ""
for key := range req.ConfigsUpdated {
configs += key.String()
break
}
if len(req.ConfigsUpdated) > 1 {
more := fmt.Sprintf(" and %d more configs", len(req.ConfigsUpdated)-1)
configs += more
}
return configs
}
func reasonsUpdated(req *model.PushRequest) string {
switch len(req.Reason) {
case 0:
return "unknown"
case 1:
return string(req.Reason[0])
default:
return fmt.Sprintf("%s and %d more reasons", req.Reason[0], len(req.Reason)-1)
}
}
func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {
for {
select {
case <-stopCh:
return
default:
// We can send to it until it is full, then it will block until a pushes finishes and reads from it.
// This limits the number of pushes that can happen concurrently
semaphore <- struct{}{}
// Get the next proxy to push. This will block if there are no updates required.
client, push, shuttingdown := queue.Dequeue()
if shuttingdown {
return
}
recordPushTriggers(push.Reason...)
// Signals that a push is done by reading from the semaphore, allowing another send on it.
doneFunc := func() {
queue.MarkDone(client)
<-semaphore
}
proxiesQueueTime.Record(time.Since(push.Start).Seconds())
var closed <-chan struct{}
if client.stream != nil {
closed = client.stream.Context().Done()
} else {
closed = client.deltaStream.Context().Done()
}
go func() {
pushEv := &Event{
pushRequest: push,
done: doneFunc,
}
select {
case client.pushChannel <- pushEv:
return
case <-closed: // grpc stream was closed
doneFunc()
log.Infof("Client closed connection %v", client.conID)
}
}()
}
}
}
// initPushContext creates a global push context and stores it on the environment. Note: while this
// method is technically thread safe (there are no data races), it should not be called in parallel;
// if it is, then we may start two push context creations (say A, and B), but then write them in
// reverse order, leaving us with a final version of A, which may be incomplete.
func (s *DiscoveryServer) initPushContext(req *model.PushRequest, oldPushContext *model.PushContext, version string) (*model.PushContext, error) {
push := model.NewPushContext()
push.PushVersion = version
push.JwtKeyResolver = s.JwtKeyResolver
if err := push.InitContext(s.Env, oldPushContext, req); err != nil {
log.Errorf("XDS: failed to init push context: %v", err)
// We can't push if we can't read the data - stick with previous version.
pushContextErrors.Increment()
return nil, err
}
if err := s.UpdateServiceShards(push); err != nil {
return nil, err
}
s.updateMutex.Lock()
s.Env.PushContext = push
// Ensure we drop the cache in the lock to avoid races, where we drop the cache, fill it back up, then update push context
s.dropCacheForRequest(req)
s.updateMutex.Unlock()
return push, nil
}
func (s *DiscoveryServer) sendPushes(stopCh <-chan struct{}) {
doSendPushes(stopCh, s.concurrentPushLimit, s.pushQueue)
}
// InitGenerators initializes generators to be used by XdsServer.
func (s *DiscoveryServer) InitGenerators(env *model.Environment, systemNameSpace string) {
edsGen := &EdsGenerator{Server: s}
s.StatusGen = NewStatusGen(s)
s.Generators[v3.ClusterType] = &CdsGenerator{Server: s}
s.Generators[v3.ListenerType] = &LdsGenerator{Server: s}
s.Generators[v3.RouteType] = &RdsGenerator{Server: s}
s.Generators[v3.EndpointType] = edsGen
s.Generators[v3.NameTableType] = &NdsGenerator{Server: s}
s.Generators[v3.ExtensionConfigurationType] = &EcdsGenerator{Server: s}
s.Generators[v3.ProxyConfigType] = &PcdsGenerator{Server: s, TrustBundle: env.TrustBundle}
s.Generators["grpc"] = &grpcgen.GrpcConfigGenerator{}
s.Generators["grpc/"+v3.EndpointType] = edsGen
s.Generators["grpc/"+v3.ListenerType] = s.Generators["grpc"]
s.Generators["grpc/"+v3.RouteType] = s.Generators["grpc"]
s.Generators["grpc/"+v3.ClusterType] = s.Generators["grpc"]
s.Generators["dubbo"] = &dubbogen.DubboConfigGenerator{}
s.Generators[v3.DubboServiceNameMappingType] = s.Generators["dubbo"]
s.Generators["api"] = apigen.NewGenerator(env.ConfigStore)
s.Generators["api/"+v3.EndpointType] = edsGen
s.Generators["api/"+TypeURLConnect] = s.StatusGen
s.Generators["event"] = s.StatusGen
s.Generators[TypeDebug] = NewDebugGen(s, systemNameSpace)
s.Generators[v3.BootstrapType] = &BootstrapGenerator{Server: s}
}
// shutdown shuts down DiscoveryServer components.
func (s *DiscoveryServer) Shutdown() {
s.closeJwksResolver()
s.pushQueue.ShutDown()
}
// Clients returns all currently connected clients. This method can be safely called concurrently,
// but care should be taken with the underlying objects (ie model.Proxy) to ensure proper locking.
// This method returns only fully initialized connections; for all connections, use AllClients
func (s *DiscoveryServer) Clients() []*Connection {
s.adsClientsMutex.RLock()
defer s.adsClientsMutex.RUnlock()
clients := make([]*Connection, 0, len(s.adsClients))
for _, con := range s.adsClients {
select {
case <-con.initialized:
default:
// Initialization not complete, skip
continue
}
clients = append(clients, con)
}
return clients
}
// AllClients returns all connected clients, per Clients, but additionally includes unintialized connections
// Warning: callers must take care not to rely on the con.proxy field being set
func (s *DiscoveryServer) AllClients() []*Connection {
s.adsClientsMutex.RLock()
defer s.adsClientsMutex.RUnlock()
clients := make([]*Connection, 0, len(s.adsClients))
for _, con := range s.adsClients {
clients = append(clients, con)
}
return clients
}
// SendResponse will immediately send the response to all connections.
// TODO: additional filters can be added, for example namespace.
func (s *DiscoveryServer) SendResponse(connections []*Connection, res *discovery.DiscoveryResponse) {
for _, p := range connections {
// p.send() waits for an ACK - which is reasonable for normal push,
// but in this case we want to sync fast and not bother with stuck connections.
// This is expecting a relatively small number of watchers - each other istiod
// plus few admin tools or bridges to real message brokers. The normal
// push expects 1000s of envoy connections.
con := p
go func() {
err := con.stream.Send(res)
if err != nil {
log.Errorf("Failed to send internal event %s: %v", con.conID, err)
}
}()
}
}
// nolint
// ClientsOf returns the clients that are watching the given resource.
func (s *DiscoveryServer) ClientsOf(typeUrl string) []*Connection {
pending := []*Connection{}
for _, v := range s.Clients() {
if v.Watching(typeUrl) {
pending = append(pending, v)
}
}
return pending
}
func (s *DiscoveryServer) WaitForRequestLimit(ctx context.Context) error {
if s.requestRateLimit.Limit() == 0 {
// Allow opt out when rate limiting is set to 0qps
return nil
}
// Give a bit of time for queue to clear out, but if not fail fast. Client will connect to another
// instance in best case, or retry with backoff.
wait, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
return s.requestRateLimit.Wait(wait)
}