blob: 7b567623288dac355235c85305d1b15293b93b79 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"fmt"
"sync"
"time"
)
import (
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/golang-lru/simplelru"
"google.golang.org/protobuf/testing/protocmp"
"istio.io/pkg/monitoring"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/features"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
func init() {
monitoring.MustRegister(xdsCacheReads)
monitoring.MustRegister(xdsCacheEvictions)
monitoring.MustRegister(xdsCacheSize)
}
var (
xdsCacheReads = monitoring.NewSum(
"xds_cache_reads",
"Total number of xds cache xdsCacheReads.",
monitoring.WithLabels(typeTag),
)
xdsCacheEvictions = monitoring.NewSum(
"xds_cache_evictions",
"Total number of xds cache evictions.",
)
xdsCacheSize = monitoring.NewGauge(
"xds_cache_size",
"Current size of xds cache",
)
xdsCacheHits = xdsCacheReads.With(typeTag.Value("hit"))
xdsCacheMisses = xdsCacheReads.With(typeTag.Value("miss"))
)
func hit() {
if features.EnableXDSCacheMetrics {
xdsCacheHits.Increment()
}
}
func miss() {
if features.EnableXDSCacheMetrics {
xdsCacheMisses.Increment()
}
}
func size(cs int) {
if features.EnableXDSCacheMetrics {
xdsCacheSize.Record(float64(cs))
}
}
func indexConfig(configIndex map[ConfigKey]sets.Set, k string, dependentConfigs []ConfigKey) {
for _, cfg := range dependentConfigs {
if configIndex[cfg] == nil {
configIndex[cfg] = sets.New()
}
configIndex[cfg].Insert(k)
}
}
func clearIndexConfig(configIndex map[ConfigKey]sets.Set, k string, dependentConfigs []ConfigKey) {
for _, cfg := range dependentConfigs {
index := configIndex[cfg]
if index != nil {
index.Delete(k)
if index.IsEmpty() {
delete(configIndex, cfg)
}
}
}
}
func indexType(typeIndex map[config.GroupVersionKind]sets.Set, k string, dependentTypes []config.GroupVersionKind) {
for _, t := range dependentTypes {
if typeIndex[t] == nil {
typeIndex[t] = sets.New()
}
typeIndex[t].Insert(k)
}
}
func clearIndexType(typeIndex map[config.GroupVersionKind]sets.Set, k string, dependentTypes []config.GroupVersionKind) {
for _, t := range dependentTypes {
index := typeIndex[t]
if index != nil {
index.Delete(k)
if index.IsEmpty() {
delete(typeIndex, t)
}
}
}
}
// XdsCacheEntry interface defines functions that should be implemented by
// resources that can be cached.
type XdsCacheEntry interface {
// Key is the key to be used in cache.
Key() string
// DependentTypes are config types that this cache key is dependant on.
// Whenever any configs of this type changes, we should invalidate this cache entry.
// Note: DependentConfigs should be preferred wherever possible.
DependentTypes() []config.GroupVersionKind
// DependentConfigs is config items that this cache key is dependent on.
// Whenever these configs change, we should invalidate this cache entry.
DependentConfigs() []ConfigKey
// Cacheable indicates whether this entry is valid for cache. For example
// for EDS to be cacheable, the Endpoint should have corresponding service.
Cacheable() bool
}
type CacheToken uint64
// XdsCache interface defines a store for caching XDS responses.
// All operations are thread safe.
type XdsCache interface {
// Add adds the given XdsCacheEntry with the value for the given pushContext to the cache.
// If the cache has been updated to a newer push context, the write will be dropped silently.
// This ensures stale data does not overwrite fresh data when dealing with concurrent
// writers.
Add(entry XdsCacheEntry, pushRequest *PushRequest, value *discovery.Resource)
// Get retrieves the cached value if it exists. The boolean indicates
// whether the entry exists in the cache.
Get(entry XdsCacheEntry) (*discovery.Resource, bool)
// Clear removes the cache entries that are dependent on the configs passed.
Clear(map[ConfigKey]struct{})
// ClearAll clears the entire cache.
ClearAll()
// Keys returns all currently configured keys. This is for testing/debug only
Keys() []string
// Snapshot returns a snapshot of all keys and values. This is for testing/debug only
Snapshot() map[string]*discovery.Resource
}
// NewXdsCache returns an instance of a cache.
func NewXdsCache() XdsCache {
cache := &lruCache{
enableAssertions: features.EnableUnsafeAssertions,
configIndex: map[ConfigKey]sets.Set{},
typesIndex: map[config.GroupVersionKind]sets.Set{},
}
cache.store = newLru(cache.evict)
return cache
}
// NewLenientXdsCache returns an instance of a cache that does not validate token based get/set and enable assertions.
func NewLenientXdsCache() XdsCache {
cache := &lruCache{
enableAssertions: false,
configIndex: map[ConfigKey]sets.Set{},
typesIndex: map[config.GroupVersionKind]sets.Set{},
}
cache.store = newLru(cache.evict)
return cache
}
type lruCache struct {
enableAssertions bool
store simplelru.LRUCache
// token stores the latest token of the store, used to prevent stale data overwrite.
// It is refreshed when Clear or ClearAll are called
token CacheToken
mu sync.RWMutex
configIndex map[ConfigKey]sets.Set
typesIndex map[config.GroupVersionKind]sets.Set
}
var _ XdsCache = &lruCache{}
func newLru(evictCallback simplelru.EvictCallback) simplelru.LRUCache {
sz := features.XDSCacheMaxSize
if sz <= 0 {
sz = 20000
}
l, err := simplelru.NewLRU(sz, evictCallback)
if err != nil {
panic(fmt.Errorf("invalid lru configuration: %v", err))
}
return l
}
func (l *lruCache) evict(k interface{}, v interface{}) {
if features.EnableXDSCacheMetrics {
xdsCacheEvictions.Increment()
}
key := k.(string)
value := v.(cacheValue)
// we don't need to acquire locks, since this function is called when we write to the store
clearIndexConfig(l.configIndex, key, value.dependentConfigs)
clearIndexType(l.typesIndex, key, value.dependentTypes)
}
// assertUnchanged checks that a cache entry is not changed. This helps catch bad cache invalidation
// We should never have a case where we overwrite an existing item with a new change. Instead, when
// config sources change, Clear/ClearAll should be called. At this point, we may get multiple writes
// because multiple writers may get cache misses concurrently, but they ought to generate identical
// configuration. This also checks that our XDS config generation is deterministic, which is a very
// important property.
func (l *lruCache) assertUnchanged(key string, existing *discovery.Resource, replacement *discovery.Resource) {
if l.enableAssertions {
if existing == nil {
// This is a new addition, not an update
return
}
// Record time so that we can correlate when the error actually happened, since the async reporting
// may be delayed
t0 := time.Now()
// This operation is really slow, which makes tests fail for unrelated reasons, so we process it async.
go func() {
if !cmp.Equal(existing, replacement, protocmp.Transform()) {
warning := fmt.Errorf("assertion failed at %v, cache entry changed but not cleared for key %v: %v\n%v\n%v",
t0, key, cmp.Diff(existing, replacement, protocmp.Transform()), existing, replacement)
panic(warning)
}
}()
}
}
func (l *lruCache) Add(entry XdsCacheEntry, pushReq *PushRequest, value *discovery.Resource) {
if !entry.Cacheable() || pushReq == nil || pushReq.Start.Equal(time.Time{}) {
return
}
// It will not overflow until year 2262
token := CacheToken(pushReq.Start.UnixNano())
k := entry.Key()
l.mu.Lock()
defer l.mu.Unlock()
cur, f := l.store.Get(k)
if f {
// This is the stale resource
if token < cur.(cacheValue).token || token < l.token {
// entry may be stale, we need to drop it. This can happen when the cache is invalidated
// after we call Get.
return
}
if l.enableAssertions {
l.assertUnchanged(k, cur.(cacheValue).value, value)
}
}
if token < l.token {
return
}
// we have to make sure we evict old entries with the same key
// to prevent leaking in the index maps
if old, ok := l.store.Get(k); ok {
l.evict(k, old)
}
dependentConfigs := entry.DependentConfigs()
dependentTypes := entry.DependentTypes()
toWrite := cacheValue{value: value, token: token, dependentConfigs: dependentConfigs, dependentTypes: dependentTypes}
l.store.Add(k, toWrite)
l.token = token
indexConfig(l.configIndex, k, dependentConfigs)
indexType(l.typesIndex, k, dependentTypes)
size(l.store.Len())
}
type cacheValue struct {
value *discovery.Resource
token CacheToken
dependentConfigs []ConfigKey
dependentTypes []config.GroupVersionKind
}
func (l *lruCache) Get(entry XdsCacheEntry) (*discovery.Resource, bool) {
if !entry.Cacheable() {
return nil, false
}
k := entry.Key()
l.mu.Lock()
defer l.mu.Unlock()
val, ok := l.store.Get(k)
if !ok {
miss()
return nil, false
}
cv := val.(cacheValue)
if cv.value == nil {
miss()
return nil, false
}
hit()
return cv.value, true
}
func (l *lruCache) Clear(configs map[ConfigKey]struct{}) {
l.mu.Lock()
defer l.mu.Unlock()
l.token = CacheToken(time.Now().UnixNano())
for ckey := range configs {
referenced := l.configIndex[ckey]
delete(l.configIndex, ckey)
for key := range referenced {
l.store.Remove(key)
}
tReferenced := l.typesIndex[ckey.Kind]
delete(l.typesIndex, ckey.Kind)
for key := range tReferenced {
l.store.Remove(key)
}
}
size(l.store.Len())
}
func (l *lruCache) ClearAll() {
l.mu.Lock()
defer l.mu.Unlock()
l.token = CacheToken(time.Now().UnixNano())
// Purge with an evict function would turn up to be pretty slow since
// it runs the function for every key in the store, might be better to just
// create a new store.
l.store = newLru(l.evict)
l.configIndex = map[ConfigKey]sets.Set{}
l.typesIndex = map[config.GroupVersionKind]sets.Set{}
size(l.store.Len())
}
func (l *lruCache) Keys() []string {
l.mu.RLock()
defer l.mu.RUnlock()
iKeys := l.store.Keys()
keys := make([]string, 0, len(iKeys))
for _, ik := range iKeys {
keys = append(keys, ik.(string))
}
return keys
}
func (l *lruCache) Snapshot() map[string]*discovery.Resource {
l.mu.RLock()
defer l.mu.RUnlock()
iKeys := l.store.Keys()
res := make(map[string]*discovery.Resource, len(iKeys))
for _, ik := range iKeys {
v, ok := l.store.Get(ik)
if !ok {
continue
}
res[ik.(string)] = v.(cacheValue).value
}
return res
}
// DisabledCache is a cache that is always empty
type DisabledCache struct{}
var _ XdsCache = &DisabledCache{}
func (d DisabledCache) Add(key XdsCacheEntry, pushReq *PushRequest, value *discovery.Resource) {}
func (d DisabledCache) Get(XdsCacheEntry) (*discovery.Resource, bool) {
return nil, false
}
func (d DisabledCache) Clear(configsUpdated map[ConfigKey]struct{}) {}
func (d DisabledCache) ClearAll() {}
func (d DisabledCache) Keys() []string { return nil }
func (d DisabledCache) Snapshot() map[string]*discovery.Resource { return nil }