blob: 01cd35f6b8f7c4189544b6e546be36bf36a0f2dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package krt
import (
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/ptr"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"k8s.io/client-go/tools/cache"
)
type Index[K comparable, O any] interface {
Lookup(k K) []O
AsCollection(opts ...CollectionOption) Collection[IndexObject[K, O]]
objectHasKey(obj O, k K) bool
extractKeys(o O) []K
id() collectionUID
}
type IndexObject[K comparable, O any] struct {
Key K
Objects []O
}
type index[K comparable, O any] struct {
uid collectionUID
indexer[O]
c Collection[O]
extract func(o O) []K
}
type indexCollection[K comparable, O any] struct {
idx index[K, O]
id collectionUID
metadata Metadata
// nolint: unused // (not true, its to implement an interface)
collectionName string
fromKey func(string) any
}
func NewIndex[K comparable, O any](c Collection[O], name string, extract func(o O) []K) Index[K, O] {
idx := c.(internalCollection[O]).index(name, func(o O) []string {
return slices.Map(extract(o), func(e K) string {
return toString(e)
})
})
return index[K, O]{
nextUID(),
idx,
c,
extract,
}
}
// NewNamespaceIndex is a small helper to index a collection by namespace
func NewNamespaceIndex[O Namespacer](c Collection[O]) Index[string, O] {
return NewIndex(c, cache.NamespaceIndex, func(o O) []string {
return []string{o.GetNamespace()}
})
}
func (i index[K, O]) AsCollection(opts ...CollectionOption) Collection[IndexObject[K, O]] {
o := buildCollectionOptions(opts...)
c := indexCollection[K, O]{
idx: i,
id: nextUID(),
collectionName: fmt.Sprintf("index/%s", o.name),
}
if c.fromKey == nil {
if _, ok := any(ptr.Empty[K]()).(string); !ok {
// This is a limitation of the way the API is encoded, unfortunately.
panic("index.AsCollection requires a string key or WithIndexCollectionFromString to be set")
}
c.fromKey = func(s string) any {
return s
}
}
if o.metadata != nil {
c.metadata = o.metadata
}
return c
}
// nolint: unused // (not true)
func (i index[K, O]) objectHasKey(obj O, k K) bool {
for _, got := range i.extract(obj) {
if got == k {
return true
}
}
return false
}
// nolint: unused // (not true)
func (i index[K, O]) extractKeys(o O) []K {
return i.extract(o)
}
// nolint: unused // (not true)
func (i index[K, O]) id() collectionUID {
return i.uid
}
// Lookup finds all objects matching a given key
func (i index[K, O]) Lookup(k K) []O {
if i.indexer == nil {
return nil
}
return i.indexer.Lookup(toString(k))
}
// nolint: unused // (not true, its to implement an interface)
func (i indexCollection[K, O]) name() string {
return i.collectionName
}
// nolint: unused // (not true, its to implement an interface)
func (i indexCollection[K, O]) uid() collectionUID {
return i.id
}
// nolint: unused // (not true, its to implement an interface)
func (i indexCollection[K, O]) dump() CollectionDump {
return CollectionDump{
Outputs: i.dumpOutput(),
InputCollection: i.idx.c.(internalCollection[O]).name(),
Synced: i.HasSynced(),
}
}
// nolint: unused // (not true, its to implement an interface)
func (i indexCollection[K, O]) augment(a any) any {
return a
}
// nolint: unused // (not true, its to implement an interface)
func (i indexCollection[K, O]) index(name string, extract func(o IndexObject[K, O]) []string) indexer[IndexObject[K, O]] {
panic("an index cannot be indexed")
}
func (i indexCollection[K, O]) GetKey(k string) *IndexObject[K, O] {
tk := i.fromKey(k).(K)
objs := i.idx.Lookup(tk)
return &IndexObject[K, O]{
Key: tk,
Objects: objs,
}
}
func (i indexCollection[K, O]) List() []IndexObject[K, O] {
panic("an index collection cannot be listed")
}
// dumpOutput dumps the current state. This has no synchronization, so it's not perfect.
// This will not result in a Go level data-race, but can give incorrect information so is best-effort only.
// nolint: unused // (not true...)
func (i indexCollection[K, O]) dumpOutput() map[string]any {
o := i.idx.c.List()
keys := sets.New[K]()
for _, oo := range o {
keys.InsertAll(i.idx.extractKeys(oo)...)
}
res := map[string]any{}
for k := range keys {
ks := toString(k)
res[ks] = *i.GetKey(ks)
}
return res
}
func (i indexCollection[K, O]) WaitUntilSynced(stop <-chan struct{}) bool {
return i.idx.c.WaitUntilSynced(stop)
}
func (i indexCollection[K, O]) HasSynced() bool {
return i.idx.c.HasSynced()
}
func (i indexCollection[K, O]) Metadata() Metadata {
return i.metadata
}
func (i indexCollection[K, O]) Register(f func(o Event[IndexObject[K, O]])) HandlerRegistration {
return i.RegisterBatch(func(events []Event[IndexObject[K, O]]) {
for _, o := range events {
f(o)
}
}, true)
}
func (i indexCollection[K, O]) RegisterBatch(f func(o []Event[IndexObject[K, O]]), runExistingState bool) HandlerRegistration {
return i.idx.c.RegisterBatch(func(o []Event[O]) {
allKeys := sets.New[K]()
for _, ev := range o {
if ev.Old != nil {
allKeys.InsertAll(i.idx.extractKeys(*ev.Old)...)
}
if ev.New != nil {
allKeys.InsertAll(i.idx.extractKeys(*ev.New)...)
}
}
downstream := make([]Event[IndexObject[K, O]], 0, len(allKeys))
for key := range allKeys {
v := i.GetKey(toString(key))
// Due to the semantics around indexes, we cannot reasonably compute exactly correctly.
// However, we don't really need to: simply triggering an Add/Delete is close enough to work.
// Building a collection from an indexCollection only uses the events to determine the changed keys, which is
// available with this information.
if len(v.Objects) == 0 {
downstream = append(downstream, Event[IndexObject[K, O]]{
Old: &IndexObject[K, O]{Key: key, Objects: nil},
Event: controllers.EventDelete,
})
} else {
downstream = append(downstream, Event[IndexObject[K, O]]{
New: v,
Event: controllers.EventAdd,
})
}
}
f(downstream)
}, runExistingState)
}
func (i IndexObject[K, O]) ResourceName() string {
return toString(i.Key)
}
func toString(rk any) string {
tk, ok := rk.(string)
if !ok {
return rk.(fmt.Stringer).String()
}
return tk
}