| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| // Copyright 2018 Envoyproxy Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package v3 |
| |
| import ( |
| "context" |
| "fmt" |
| "sync" |
| "sync/atomic" |
| "time" |
| ) |
| |
| import ( |
| envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" |
| "github.com/envoyproxy/go-control-plane/pkg/cache/types" |
| envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" |
| "github.com/envoyproxy/go-control-plane/pkg/log" |
| "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" |
| ) |
| |
| type Snapshot interface { |
| // GetSupportedTypes returns a list of xDS types supported by this snapshot. |
| GetSupportedTypes() []string |
| |
| // Consistent check verifies that the dependent resources are exactly listed in the |
| // snapshot: |
| // - all EDS resources are listed by name in CDS resources |
| // - all RDS resources are listed by name in LDS resources |
| // |
| // Note that clusters and listeners are requested without name references, so |
| // Envoy will accept the snapshot list of clusters as-is even if it does not match |
| // all references found in xDS. |
| Consistent() error |
| |
| // GetResources selects snapshot resources by type. |
| GetResources(typ string) map[string]types.Resource |
| |
| // GetVersion returns the version for a resource type. |
| GetVersion(typ string) string |
| |
| // WithVersion creates a new snapshot with a different version for a given resource type. |
| WithVersion(typ string, version string) Snapshot |
| } |
| |
| // SnapshotCache is a snapshot-based envoy_cache that maintains a single versioned |
| // snapshot of responses per node. SnapshotCache consistently replies with the |
| // latest snapshot. For the protocol to work correctly in ADS mode, EDS/RDS |
| // requests are responded only when all resources in the snapshot xDS response |
| // are named as part of the request. It is expected that the CDS response names |
| // all EDS clusters, and the LDS response names all RDS routes in a snapshot, |
| // to ensure that Envoy makes the request for all EDS clusters or RDS routes |
| // eventually. |
| // |
| // SnapshotCache can operate as a REST or regular xDS backend. The snapshot |
| // can be partial, e.g. only include RDS or EDS resources. |
| type SnapshotCache interface { |
| envoy_cache.Cache |
| |
| // SetSnapshot sets a response snapshot for a node. For ADS, the snapshots |
| // should have distinct versions and be internally consistent (e.g. all |
| // referenced resources must be included in the snapshot). |
| // |
| // This method will cause the server to respond to all open watches, for which |
| // the version differs from the snapshot version. |
| SetSnapshot(node string, snapshot Snapshot) error |
| |
| // GetSnapshot gets the snapshot for a node. |
| GetSnapshot(node string) (Snapshot, error) |
| |
| // HasSnapshot checks whether there is a snapshot present for a node. |
| HasSnapshot(node string) bool |
| |
| // ClearSnapshot removes all status and snapshot information associated with a node. Return the removed snapshot or nil |
| ClearSnapshot(node string) Snapshot |
| |
| // GetStatusInfo retrieves status information for a node ID. |
| GetStatusInfo(string) StatusInfo |
| |
| // GetStatusKeys retrieves node IDs for all statuses. |
| GetStatusKeys() []string |
| } |
| |
| // Generates a snapshot of xDS resources for a given node. |
| type SnapshotGenerator interface { |
| GenerateSnapshot(context.Context, *envoy_config_core_v3.Node) (Snapshot, error) |
| } |
| |
| type snapshotCache struct { |
| // watchCount is an atomic counter incremented for each watch. This needs to |
| // be the first field in the struct to guarantee that it is 64-bit aligned, |
| // which is a requirement for atomic operations on 64-bit operands to work on |
| // 32-bit machines. |
| watchCount int64 |
| |
| log log.Logger |
| |
| // ads flag to hold responses until all resources are named |
| ads bool |
| |
| // snapshots are cached resources indexed by node IDs |
| snapshots map[string]Snapshot |
| |
| // status information for all nodes indexed by node IDs |
| status map[string]*statusInfo |
| |
| // hash is the hashing function for Envoy nodes |
| hash NodeHash |
| |
| mu sync.RWMutex |
| } |
| |
| // NewSnapshotCache initializes a simple envoy_cache. |
| // |
| // ADS flag forces a delay in responding to streaming requests until all |
| // resources are explicitly named in the request. This avoids the problem of a |
| // partial request over a single stream for a subset of resources which would |
| // require generating a fresh version for acknowledgement. ADS flag requires |
| // snapshot consistency. For non-ADS case (and fetch), multiple partial |
| // requests are sent across multiple streams and re-using the snapshot version |
| // is OK. |
| // |
| // Logger is optional. |
| func NewSnapshotCache(ads bool, hash NodeHash, logger log.Logger) SnapshotCache { |
| return &snapshotCache{ |
| log: logger, |
| ads: ads, |
| snapshots: make(map[string]Snapshot), |
| status: make(map[string]*statusInfo), |
| hash: hash, |
| } |
| } |
| |
| // SetSnapshotCache updates a snapshot for a node. |
| func (cache *snapshotCache) SetSnapshot(node string, snapshot Snapshot) error { |
| cache.mu.Lock() |
| defer cache.mu.Unlock() |
| |
| // update the existing entry |
| cache.snapshots[node] = snapshot |
| |
| // trigger existing watches for which version changed |
| if info, ok := cache.status[node]; ok { |
| info.mu.Lock() |
| for id, watch := range info.watches { |
| version := snapshot.GetVersion(watch.Request.TypeUrl) |
| if version != watch.Request.VersionInfo { |
| if cache.log != nil { |
| cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version) |
| } |
| cache.respond(watch.Request, watch.Response, snapshot.GetResources(watch.Request.TypeUrl), version) |
| |
| // discard the watch |
| delete(info.watches, id) |
| } |
| } |
| info.mu.Unlock() |
| } |
| |
| return nil |
| } |
| |
| // GetSnapshots gets the snapshot for a node, and returns an error if not found. |
| func (cache *snapshotCache) GetSnapshot(node string) (Snapshot, error) { |
| cache.mu.RLock() |
| defer cache.mu.RUnlock() |
| |
| snap, ok := cache.snapshots[node] |
| if !ok { |
| return nil, fmt.Errorf("no snapshot found for node %s", node) |
| } |
| return snap, nil |
| } |
| |
| func (cache *snapshotCache) HasSnapshot(node string) bool { |
| cache.mu.RLock() |
| defer cache.mu.RUnlock() |
| |
| _, ok := cache.snapshots[node] |
| return ok |
| } |
| |
| // ClearSnapshot clears snapshot and info for a node. |
| func (cache *snapshotCache) ClearSnapshot(node string) Snapshot { |
| cache.mu.Lock() |
| defer cache.mu.Unlock() |
| |
| snapshot := cache.snapshots[node] |
| delete(cache.snapshots, node) |
| delete(cache.status, node) |
| return snapshot |
| } |
| |
| // nameSet creates a map from a string slice to value true. |
| func nameSet(names []string) map[string]bool { |
| set := make(map[string]bool) |
| for _, name := range names { |
| set[name] = true |
| } |
| return set |
| } |
| |
| // superset checks that all resources are listed in the names set. |
| func superset(names map[string]bool, resources map[string]types.Resource) error { |
| for resourceName := range resources { |
| if _, exists := names[resourceName]; !exists { |
| return fmt.Errorf("%q not listed", resourceName) |
| } |
| } |
| return nil |
| } |
| |
| func (cache *snapshotCache) CreateDeltaWatch(*envoy_cache.DeltaRequest, stream.StreamState, chan envoy_cache.DeltaResponse) func() { |
| return nil |
| } |
| |
| // CreateWatch returns a watch for an xDS request. |
| func (cache *snapshotCache) CreateWatch(request *envoy_cache.Request, _ stream.StreamState, responseChan chan envoy_cache.Response) func() { |
| nodeID := cache.hash.ID(request.Node) |
| |
| cache.mu.Lock() |
| defer cache.mu.Unlock() |
| info, ok := cache.status[nodeID] |
| if !ok { |
| info = newStatusInfo(request.Node) |
| cache.status[nodeID] = info |
| } |
| |
| // update last watch request time |
| info.mu.Lock() |
| info.lastWatchRequestTime = time.Now() |
| info.mu.Unlock() |
| |
| snapshot, exists := cache.snapshots[nodeID] |
| version := "" |
| if exists { |
| version = snapshot.GetVersion(request.TypeUrl) |
| } |
| |
| // if the requested version is up-to-date or missing a response, leave an open watch |
| if !exists || request.VersionInfo == version { |
| watchID := cache.nextWatchID() |
| if cache.log != nil { |
| cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, |
| request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo) |
| } |
| info.mu.Lock() |
| info.watches[watchID] = ResponseWatch{Request: request, Response: responseChan} |
| info.mu.Unlock() |
| return cache.cancelWatch(nodeID, watchID) |
| } |
| |
| // otherwise, the watch may be responded immediately |
| cache.respond(request, responseChan, snapshot.GetResources(request.TypeUrl), version) |
| |
| return nil |
| } |
| |
| func (cache *snapshotCache) nextWatchID() int64 { |
| return atomic.AddInt64(&cache.watchCount, 1) |
| } |
| |
| // cancellation function for cleaning stale watches |
| func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() { |
| return func() { |
| // uses the envoy_cache mutex |
| cache.mu.Lock() |
| defer cache.mu.Unlock() |
| if info, ok := cache.status[nodeID]; ok { |
| info.mu.Lock() |
| delete(info.watches, watchID) |
| info.mu.Unlock() |
| } |
| } |
| } |
| |
| // Respond to a watch with the snapshot value. The value channel should have capacity not to block. |
| // TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46 |
| func (cache *snapshotCache) respond(request *envoy_cache.Request, value chan envoy_cache.Response, resources map[string]types.Resource, version string) { |
| // for ADS, the request names must match the snapshot names |
| // if they do not, then the watch is never responded, and it is expected that envoy makes another request |
| if len(request.ResourceNames) != 0 && cache.ads { |
| if err := superset(nameSet(request.ResourceNames), resources); err != nil { |
| if cache.log != nil { |
| cache.log.Debugf("ADS mode: not responding to request: %v", err) |
| } |
| return |
| } |
| } |
| if cache.log != nil { |
| cache.log.Debugf("respond %s%v version %q with version %q", |
| request.TypeUrl, request.ResourceNames, request.VersionInfo, version) |
| } |
| |
| value <- createResponse(request, resources, version) |
| } |
| |
| func createResponse(request *envoy_cache.Request, resources map[string]types.Resource, version string) envoy_cache.Response { |
| filtered := make([]types.ResourceWithTTL, 0, len(resources)) |
| |
| // Reply only with the requested resources. Envoy may ask each resource |
| // individually in a separate stream. It is ok to reply with the same version |
| // on separate streams since requests do not share their response versions. |
| if len(request.ResourceNames) != 0 { |
| set := nameSet(request.ResourceNames) |
| for name, resource := range resources { |
| if set[name] { |
| filtered = append(filtered, types.ResourceWithTTL{Resource: resource}) |
| } |
| } |
| } else { |
| for _, resource := range resources { |
| filtered = append(filtered, types.ResourceWithTTL{Resource: resource}) |
| } |
| } |
| |
| return &envoy_cache.RawResponse{ |
| Request: request, |
| Version: version, |
| Resources: filtered, |
| } |
| } |
| |
| // Fetch implements the envoy_cache fetch function. |
| // Fetch is called on multiple streams, so responding to individual names with the same version works. |
| // If there is a Deadline set on the context, the call will block until either the context is terminated |
| // or there is a new update. |
| func (cache *snapshotCache) Fetch(ctx context.Context, request *envoy_cache.Request) (envoy_cache.Response, error) { |
| if _, hasDeadline := ctx.Deadline(); hasDeadline { |
| return cache.blockingFetch(ctx, request) |
| } |
| |
| nodeID := cache.hash.ID(request.Node) |
| |
| cache.mu.RLock() |
| defer cache.mu.RUnlock() |
| |
| if snapshot, exists := cache.snapshots[nodeID]; exists { |
| // Respond only if the request version is distinct from the current snapshot state. |
| // It might be beneficial to hold the request since Envoy will re-attempt the refresh. |
| version := snapshot.GetVersion(request.TypeUrl) |
| if request.VersionInfo == version { |
| if cache.log != nil { |
| cache.log.Warnf("skip fetch: version up to date") |
| } |
| return nil, &types.SkipFetchError{} |
| } |
| |
| resources := snapshot.GetResources(request.TypeUrl) |
| out := createResponse(request, resources, version) |
| return out, nil |
| } |
| |
| return nil, fmt.Errorf("missing snapshot for %q", nodeID) |
| } |
| |
| // blockingFetch will wait until either the context is terminated or new resources become available |
| func (cache *snapshotCache) blockingFetch(ctx context.Context, request *envoy_cache.Request) (envoy_cache.Response, error) { |
| responseChan := make(chan envoy_cache.Response, 1) |
| cancelFunc := cache.CreateWatch(request, stream.StreamState{}, responseChan) |
| if cancelFunc != nil { |
| defer cancelFunc() |
| } |
| |
| select { |
| case <-ctx.Done(): |
| // finished without an update |
| return nil, &types.SkipFetchError{} |
| case resp := <-responseChan: |
| return resp, nil |
| } |
| } |
| |
| // GetStatusInfo retrieves the status info for the node. |
| func (cache *snapshotCache) GetStatusInfo(node string) StatusInfo { |
| cache.mu.RLock() |
| defer cache.mu.RUnlock() |
| |
| info, exists := cache.status[node] |
| if !exists { |
| if cache.log != nil { |
| cache.log.Warnf("node does not exist") |
| } |
| return nil |
| } |
| |
| return info |
| } |
| |
| // GetStatusKeys retrieves all node IDs in the status map. |
| func (cache *snapshotCache) GetStatusKeys() []string { |
| cache.mu.RLock() |
| defer cache.mu.RUnlock() |
| |
| out := make([]string, 0, len(cache.status)) |
| for id := range cache.status { |
| out = append(out, id) |
| } |
| |
| return out |
| } |
| |
| // NodeHash computes string identifiers for Envoy nodes. |
| type NodeHash interface { |
| // ID function defines a unique string identifier for the remote Envoy node. |
| ID(node *envoy_config_core_v3.Node) string |
| } |
| |
| // IDHash uses ID field as the node hash. |
| type IDHash struct{} |
| |
| // ID uses the node ID field |
| func (IDHash) ID(node *envoy_config_core_v3.Node) string { |
| if node == nil { |
| return "" |
| } |
| return node.Id |
| } |
| |
| var _ NodeHash = IDHash{} |
| |
| // StatusInfo tracks the server state for the remote Envoy node. |
| // Not all fields are used by all envoy_cache implementations. |
| type StatusInfo interface { |
| // GetNode returns the node metadata. |
| GetNode() *envoy_config_core_v3.Node |
| |
| // GetNumWatches returns the number of open watches. |
| GetNumWatches() int |
| |
| // GetLastWatchRequestTime returns the timestamp of the last discovery watch request. |
| GetLastWatchRequestTime() time.Time |
| } |
| |
| type statusInfo struct { |
| // node is the constant Envoy node metadata. |
| node *envoy_config_core_v3.Node |
| |
| // watches are indexed channels for the response watches and the original requests. |
| watches map[int64]ResponseWatch |
| |
| // the timestamp of the last watch request |
| lastWatchRequestTime time.Time |
| |
| // mutex to protect the status fields. |
| // should not acquire mutex of the parent envoy_cache after acquiring this mutex. |
| mu sync.RWMutex |
| } |
| |
| // ResponseWatch is a watch record keeping both the request and an open channel for the response. |
| type ResponseWatch struct { |
| // Request is the original request for the watch. |
| Request *envoy_cache.Request |
| |
| // Response is the channel to push responses to. |
| Response chan envoy_cache.Response |
| } |
| |
| // newStatusInfo initializes a status info data structure. |
| func newStatusInfo(node *envoy_config_core_v3.Node) *statusInfo { |
| out := statusInfo{ |
| node: node, |
| watches: make(map[int64]ResponseWatch), |
| } |
| return &out |
| } |
| |
| func (info *statusInfo) GetNode() *envoy_config_core_v3.Node { |
| info.mu.RLock() |
| defer info.mu.RUnlock() |
| return info.node |
| } |
| |
| func (info *statusInfo) GetNumWatches() int { |
| info.mu.RLock() |
| defer info.mu.RUnlock() |
| return len(info.watches) |
| } |
| |
| func (info *statusInfo) GetLastWatchRequestTime() time.Time { |
| info.mu.RLock() |
| defer info.mu.RUnlock() |
| return info.lastWatchRequestTime |
| } |