blob: 9a76c8bce7ade95bf1f08ad6885eb6adde278bce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package krt
import (
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/util/ptr"
"sync"
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/maps"
"github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
)
type staticList[T any] struct {
mu sync.RWMutex
vals map[string]T
eventHandlers *handlerSet[T]
id collectionUID
stop <-chan struct{}
collectionName string
syncer Syncer
metadata Metadata
indexes map[string]staticListIndex[T]
}
// nolint: unused // (not true)
type staticListIndex[T any] struct {
extract func(o T) []string
index map[string]sets.Set[string]
parent *staticList[T]
}
type StaticCollection[T any] struct {
*staticList[T]
}
func NewStaticCollection[T any](synced Syncer, vals []T, opts ...CollectionOption) StaticCollection[T] {
o := buildCollectionOptions(opts...)
if o.name == "" {
o.name = fmt.Sprintf("Static[%v]", ptr.TypeName[T]())
}
res := make(map[string]T, len(vals))
for _, v := range vals {
res[GetKey(v)] = v
}
if synced == nil {
synced = alwaysSynced{}
}
sl := &staticList[T]{
eventHandlers: newHandlerSet[T](),
vals: res,
id: nextUID(),
stop: o.stop,
collectionName: o.name,
syncer: synced,
indexes: make(map[string]staticListIndex[T]),
}
if o.metadata != nil {
sl.metadata = o.metadata
}
c := StaticCollection[T]{
staticList: sl,
}
maybeRegisterCollectionForDebugging[T](c, o.debugger)
return c
}
func (s StaticCollection[T]) Reset(newState []T) {
s.mu.Lock()
defer s.mu.Unlock()
var updates []Event[T]
nv := map[string]T{}
for _, incoming := range newState {
k := GetKey(incoming)
nv[k] = incoming
if old, f := s.vals[k]; f {
if !Equal(old, incoming) {
ev := Event[T]{
Old: &old,
New: &incoming,
Event: controllers.EventUpdate,
}
for _, index := range s.indexes {
index.update(ev, k)
}
updates = append(updates, ev)
}
} else {
ev := Event[T]{
New: &incoming,
Event: controllers.EventAdd,
}
for _, index := range s.indexes {
index.update(ev, k)
}
updates = append(updates, ev)
}
delete(s.vals, k)
}
for k, remaining := range s.vals {
for _, index := range s.indexes {
index.delete(remaining, k)
}
updates = append(updates, Event[T]{
Old: &remaining,
Event: controllers.EventDelete,
})
}
s.vals = nv
if len(updates) > 0 {
s.eventHandlers.Distribute(updates, false)
}
}
func (s *staticList[T]) Register(f func(o Event[T])) HandlerRegistration {
return registerHandlerAsBatched(s, f)
}
// nolint: unused // (not true, its to implement an interface)
func (s *staticList[T]) name() string {
return s.collectionName
}
// nolint: unused // (not true, its to implement an interface)
func (s *staticList[T]) uid() collectionUID {
return s.id
}
// nolint: unused // (not true, its to implement an interface)
func (s *staticList[T]) dump() CollectionDump {
return CollectionDump{
Outputs: eraseMap(slices.GroupUnique(s.List(), getTypedKey)),
Synced: s.HasSynced(),
}
}
func (s *staticList[T]) index(name string, extract func(o T) []string) indexer[T] {
s.mu.Lock()
defer s.mu.Unlock()
if idx, ok := s.indexes[name]; ok {
return idx
}
idx := staticListIndex[T]{
extract: extract,
index: make(map[string]sets.Set[string]),
parent: s,
}
for k, v := range s.vals {
idx.update(Event[T]{
Old: nil,
New: &v,
Event: controllers.EventAdd,
}, k)
}
s.indexes[name] = idx
return idx
}
// nolint: unused // (not true, its to implement an interface)
func (s *staticList[T]) augment(a any) any {
return a
}
func (s *staticList[T]) HasSynced() bool {
return s.syncer.HasSynced()
}
func (s *staticList[T]) Synced() Syncer {
// We are always synced in the static collection since the initial state must be provided upfront
return alwaysSynced{}
}
func (s *staticList[T]) GetKey(k string) *T {
s.mu.RLock()
defer s.mu.RUnlock()
if o, f := s.vals[k]; f {
return &o
}
return nil
}
func (s *staticList[T]) Metadata() Metadata {
return s.metadata
}
func (s *staticList[T]) WaitUntilSynced(stop <-chan struct{}) bool {
return s.syncer.WaitUntilSynced(stop)
}
func (s *staticList[T]) RegisterBatch(f func(o []Event[T]), runExistingState bool) HandlerRegistration {
s.mu.Lock()
defer s.mu.Unlock()
var objs []Event[T]
if runExistingState {
for _, v := range s.vals {
objs = append(objs, Event[T]{
New: &v,
Event: controllers.EventAdd,
})
}
}
return s.eventHandlers.Insert(f, s.Synced(), objs, s.stop)
}
func (s *staticList[T]) List() []T {
s.mu.RLock()
defer s.mu.RUnlock()
return maps.Values(s.vals)
}
func (s staticListIndex[T]) update(ev Event[T], oKey string) {
if ev.Old != nil {
s.delete(*ev.Old, oKey)
}
if ev.New != nil {
newIndexKeys := s.extract(*ev.New)
for _, newIndexKey := range newIndexKeys {
sets.InsertOrNew(s.index, newIndexKey, oKey)
}
}
}
func (s staticListIndex[T]) delete(o T, oKey string) {
oldIndexKeys := s.extract(o)
for _, oldIndexKey := range oldIndexKeys {
sets.DeleteCleanupLast(s.index, oldIndexKey, oKey)
}
}
func (s staticListIndex[T]) Lookup(key string) []T {
s.parent.mu.RLock()
defer s.parent.mu.RUnlock()
keys := s.index[key]
res := make([]T, 0, len(keys))
for k := range keys {
v, f := s.parent.vals[k]
if !f {
continue
}
res = append(res, v)
}
return res
}