| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package krt |
| |
| import ( |
| "fmt" |
| "github.com/apache/dubbo-kubernetes/pkg/kube/controllers" |
| "github.com/apache/dubbo-kubernetes/pkg/slices" |
| "github.com/apache/dubbo-kubernetes/pkg/util/ptr" |
| "github.com/apache/dubbo-kubernetes/pkg/util/sets" |
| "k8s.io/client-go/tools/cache" |
| ) |
| |
| type Index[K comparable, O any] interface { |
| Lookup(k K) []O |
| AsCollection(opts ...CollectionOption) Collection[IndexObject[K, O]] |
| objectHasKey(obj O, k K) bool |
| extractKeys(o O) []K |
| id() collectionUID |
| } |
| |
| type IndexObject[K comparable, O any] struct { |
| Key K |
| Objects []O |
| } |
| |
| type index[K comparable, O any] struct { |
| uid collectionUID |
| indexer[O] |
| c Collection[O] |
| extract func(o O) []K |
| } |
| |
| type indexCollection[K comparable, O any] struct { |
| idx index[K, O] |
| id collectionUID |
| metadata Metadata |
| // nolint: unused // (not true, its to implement an interface) |
| collectionName string |
| fromKey func(string) any |
| } |
| |
| func NewIndex[K comparable, O any](c Collection[O], name string, extract func(o O) []K) Index[K, O] { |
| idx := c.(internalCollection[O]).index(name, func(o O) []string { |
| return slices.Map(extract(o), func(e K) string { |
| return toString(e) |
| }) |
| }) |
| |
| return index[K, O]{ |
| nextUID(), |
| idx, |
| c, |
| extract, |
| } |
| } |
| |
| // NewNamespaceIndex is a small helper to index a collection by namespace |
| func NewNamespaceIndex[O Namespacer](c Collection[O]) Index[string, O] { |
| return NewIndex(c, cache.NamespaceIndex, func(o O) []string { |
| return []string{o.GetNamespace()} |
| }) |
| } |
| |
| func (i index[K, O]) AsCollection(opts ...CollectionOption) Collection[IndexObject[K, O]] { |
| o := buildCollectionOptions(opts...) |
| |
| c := indexCollection[K, O]{ |
| idx: i, |
| id: nextUID(), |
| collectionName: fmt.Sprintf("index/%s", o.name), |
| } |
| if c.fromKey == nil { |
| if _, ok := any(ptr.Empty[K]()).(string); !ok { |
| // This is a limitation of the way the API is encoded, unfortunately. |
| panic("index.AsCollection requires a string key or WithIndexCollectionFromString to be set") |
| } |
| c.fromKey = func(s string) any { |
| return s |
| } |
| } |
| if o.metadata != nil { |
| c.metadata = o.metadata |
| } |
| return c |
| } |
| |
| // nolint: unused // (not true) |
| func (i index[K, O]) objectHasKey(obj O, k K) bool { |
| for _, got := range i.extract(obj) { |
| if got == k { |
| return true |
| } |
| } |
| return false |
| } |
| |
| // nolint: unused // (not true) |
| func (i index[K, O]) extractKeys(o O) []K { |
| return i.extract(o) |
| } |
| |
| // nolint: unused // (not true) |
| func (i index[K, O]) id() collectionUID { |
| return i.uid |
| } |
| |
| // Lookup finds all objects matching a given key |
| func (i index[K, O]) Lookup(k K) []O { |
| if i.indexer == nil { |
| return nil |
| } |
| return i.indexer.Lookup(toString(k)) |
| } |
| |
| // nolint: unused // (not true, its to implement an interface) |
| func (i indexCollection[K, O]) name() string { |
| return i.collectionName |
| } |
| |
| // nolint: unused // (not true, its to implement an interface) |
| func (i indexCollection[K, O]) uid() collectionUID { |
| return i.id |
| } |
| |
| // nolint: unused // (not true, its to implement an interface) |
| func (i indexCollection[K, O]) dump() CollectionDump { |
| return CollectionDump{ |
| Outputs: i.dumpOutput(), |
| InputCollection: i.idx.c.(internalCollection[O]).name(), |
| Synced: i.HasSynced(), |
| } |
| } |
| |
| // nolint: unused // (not true, its to implement an interface) |
| func (i indexCollection[K, O]) augment(a any) any { |
| return a |
| } |
| |
| // nolint: unused // (not true, its to implement an interface) |
| func (i indexCollection[K, O]) index(name string, extract func(o IndexObject[K, O]) []string) indexer[IndexObject[K, O]] { |
| panic("an index cannot be indexed") |
| } |
| |
| func (i indexCollection[K, O]) GetKey(k string) *IndexObject[K, O] { |
| tk := i.fromKey(k).(K) |
| objs := i.idx.Lookup(tk) |
| return &IndexObject[K, O]{ |
| Key: tk, |
| Objects: objs, |
| } |
| } |
| |
| func (i indexCollection[K, O]) List() []IndexObject[K, O] { |
| panic("an index collection cannot be listed") |
| } |
| |
| // dumpOutput dumps the current state. This has no synchronization, so it's not perfect. |
| // This will not result in a Go level data-race, but can give incorrect information so is best-effort only. |
| // nolint: unused // (not true...) |
| func (i indexCollection[K, O]) dumpOutput() map[string]any { |
| o := i.idx.c.List() |
| keys := sets.New[K]() |
| for _, oo := range o { |
| keys.InsertAll(i.idx.extractKeys(oo)...) |
| } |
| res := map[string]any{} |
| for k := range keys { |
| ks := toString(k) |
| res[ks] = *i.GetKey(ks) |
| } |
| return res |
| } |
| |
| func (i indexCollection[K, O]) WaitUntilSynced(stop <-chan struct{}) bool { |
| return i.idx.c.WaitUntilSynced(stop) |
| } |
| |
| func (i indexCollection[K, O]) HasSynced() bool { |
| return i.idx.c.HasSynced() |
| } |
| |
| func (i indexCollection[K, O]) Metadata() Metadata { |
| return i.metadata |
| } |
| |
| func (i indexCollection[K, O]) Register(f func(o Event[IndexObject[K, O]])) HandlerRegistration { |
| return i.RegisterBatch(func(events []Event[IndexObject[K, O]]) { |
| for _, o := range events { |
| f(o) |
| } |
| }, true) |
| } |
| |
| func (i indexCollection[K, O]) RegisterBatch(f func(o []Event[IndexObject[K, O]]), runExistingState bool) HandlerRegistration { |
| return i.idx.c.RegisterBatch(func(o []Event[O]) { |
| allKeys := sets.New[K]() |
| for _, ev := range o { |
| if ev.Old != nil { |
| allKeys.InsertAll(i.idx.extractKeys(*ev.Old)...) |
| } |
| if ev.New != nil { |
| allKeys.InsertAll(i.idx.extractKeys(*ev.New)...) |
| } |
| } |
| downstream := make([]Event[IndexObject[K, O]], 0, len(allKeys)) |
| for key := range allKeys { |
| v := i.GetKey(toString(key)) |
| // Due to the semantics around indexes, we cannot reasonably compute exactly correctly. |
| // However, we don't really need to: simply triggering an Add/Delete is close enough to work. |
| // Building a collection from an indexCollection only uses the events to determine the changed keys, which is |
| // available with this information. |
| if len(v.Objects) == 0 { |
| downstream = append(downstream, Event[IndexObject[K, O]]{ |
| Old: &IndexObject[K, O]{Key: key, Objects: nil}, |
| Event: controllers.EventDelete, |
| }) |
| } else { |
| downstream = append(downstream, Event[IndexObject[K, O]]{ |
| New: v, |
| Event: controllers.EventAdd, |
| }) |
| } |
| } |
| f(downstream) |
| }, runExistingState) |
| } |
| |
| func (i IndexObject[K, O]) ResourceName() string { |
| return toString(i.Key) |
| } |
| |
| func toString(rk any) string { |
| tk, ok := rk.(string) |
| if !ok { |
| return rk.(fmt.Stringer).String() |
| } |
| return tk |
| } |