blob: 782e0a88a16ebdd155096d9f12b18e8fb0bd68f3 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"fmt"
"sort"
"sync"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider"
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/gvk"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
// shardRegistry is a simplified interface for registries that can produce a shard key
type shardRegistry interface {
Cluster() cluster.ID
Provider() provider.ID
}
// ShardKeyFromRegistry computes the shard key based on provider type and cluster id.
func ShardKeyFromRegistry(instance shardRegistry) ShardKey {
return ShardKey{Cluster: instance.Cluster(), Provider: instance.Provider()}
}
// ShardKey is the key for EndpointShards made of a key with the format "cluster/provider"
type ShardKey struct {
Cluster cluster.ID
Provider provider.ID
}
func (sk ShardKey) String() string {
return fmt.Sprintf("%s/%s", sk.Provider, sk.Cluster)
}
// EndpointShards holds the set of endpoint shards of a service. Registries update
// individual shards incrementally. The shards are aggregated and split into
// clusters when a push for the specific cluster is needed.
type EndpointShards struct {
// mutex protecting below map.
sync.RWMutex
// Shards is used to track the shards. EDS updates are grouped by shard.
// Current implementation uses the registry name as key - in multicluster this is the
// name of the k8s cluster, derived from the config (secret).
Shards map[ShardKey][]*IstioEndpoint
// ServiceAccounts has the concatenation of all service accounts seen so far in endpoints.
// This is updated on push, based on shards. If the previous list is different than
// current list, a full push will be forced, to trigger a secure naming update.
// Due to the larger time, it is still possible that connection errors will occur while
// CDS is updated.
ServiceAccounts sets.Set
}
// Keys gives a sorted list of keys for EndpointShards.Shards.
// Calls to Keys should be guarded with a lock on the EndpointShards.
func (es *EndpointShards) Keys() []ShardKey {
// len(shards) ~= number of remote clusters which isn't too large, doing this sort frequently
// shouldn't be too problematic. If it becomes an issue we can cache it in the EndpointShards struct.
keys := make([]ShardKey, 0, len(es.Shards))
for k := range es.Shards {
keys = append(keys, k)
}
if len(keys) >= 2 {
sort.Slice(keys, func(i, j int) bool {
if keys[i].Provider == keys[j].Provider {
return keys[i].Cluster < keys[j].Cluster
}
return keys[i].Provider < keys[j].Provider
})
}
return keys
}
// EndpointIndex is a mutex protected index of endpoint shards
type EndpointIndex struct {
mu sync.RWMutex
// keyed by svc then ns
shardsBySvc map[string]map[string]*EndpointShards
// We'll need to clear the cache in-sync with endpoint shards modifications.
cache XdsCache
}
func NewEndpointIndex() *EndpointIndex {
return &EndpointIndex{
shardsBySvc: make(map[string]map[string]*EndpointShards),
}
}
func (e *EndpointIndex) SetCache(cache XdsCache) {
e.mu.Lock()
defer e.mu.Unlock()
e.cache = cache
}
// must be called with lock
func (e *EndpointIndex) clearCacheForService(svc, ns string) {
if e.cache == nil {
return
}
e.cache.Clear(map[ConfigKey]struct{}{{
Kind: gvk.ServiceEntry,
Name: svc,
Namespace: ns,
}: {}})
}
// Shardz returns a copy of the global map of shards but does NOT copy the underlying individual EndpointShards.
func (e *EndpointIndex) Shardz() map[string]map[string]*EndpointShards {
e.mu.RLock()
defer e.mu.RUnlock()
out := make(map[string]map[string]*EndpointShards, len(e.shardsBySvc))
for svcKey, v := range e.shardsBySvc {
out[svcKey] = make(map[string]*EndpointShards, len(v))
for nsKey, v := range v {
out[svcKey][nsKey] = v
}
}
return out
}
// ShardsForService returns the shards and true if they are found, or returns nil, false.
func (e *EndpointIndex) ShardsForService(serviceName, namespace string) (*EndpointShards, bool) {
e.mu.RLock()
defer e.mu.RUnlock()
byNs, ok := e.shardsBySvc[serviceName]
if !ok {
return nil, false
}
shards, ok := byNs[namespace]
return shards, ok
}
// GetOrCreateEndpointShard returns the shards. The second return parameter will be true if this service was seen
// for the first time.
func (e *EndpointIndex) GetOrCreateEndpointShard(serviceName, namespace string) (*EndpointShards, bool) {
e.mu.Lock()
defer e.mu.Unlock()
if _, exists := e.shardsBySvc[serviceName]; !exists {
e.shardsBySvc[serviceName] = map[string]*EndpointShards{}
}
if ep, exists := e.shardsBySvc[serviceName][namespace]; exists {
return ep, false
}
// This endpoint is for a service that was not previously loaded.
ep := &EndpointShards{
Shards: map[ShardKey][]*IstioEndpoint{},
ServiceAccounts: sets.Set{},
}
e.shardsBySvc[serviceName][namespace] = ep
// Clear the cache here to avoid race in cache writes.
e.clearCacheForService(serviceName, namespace)
return ep, true
}
func (e *EndpointIndex) DeleteServiceShard(shard ShardKey, serviceName, namespace string, preserveKeys bool) {
e.mu.Lock()
defer e.mu.Unlock()
e.deleteServiceInner(shard, serviceName, namespace, preserveKeys)
}
func (e *EndpointIndex) DeleteShard(shardKey ShardKey) {
e.mu.Lock()
defer e.mu.Unlock()
for svc, shardsByNamespace := range e.shardsBySvc {
for ns := range shardsByNamespace {
e.deleteServiceInner(shardKey, svc, ns, false)
}
}
e.cache.ClearAll()
}
// must be called with lock
func (e *EndpointIndex) deleteServiceInner(shard ShardKey, serviceName, namespace string, preserveKeys bool) {
if e.shardsBySvc[serviceName] == nil ||
e.shardsBySvc[serviceName][namespace] == nil {
return
}
epShards := e.shardsBySvc[serviceName][namespace]
epShards.Lock()
delete(epShards.Shards, shard)
epShards.ServiceAccounts = sets.Set{}
for _, shard := range epShards.Shards {
for _, ep := range shard {
if ep.ServiceAccount != "" {
epShards.ServiceAccounts.Insert(ep.ServiceAccount)
}
}
}
// Clear the cache here to avoid race in cache writes.
e.clearCacheForService(serviceName, namespace)
if !preserveKeys {
if len(epShards.Shards) == 0 {
delete(e.shardsBySvc[serviceName], namespace)
}
if len(e.shardsBySvc[serviceName]) == 0 {
delete(e.shardsBySvc, serviceName)
}
}
epShards.Unlock()
}