blob: 50363027851195814a08f99e76429f6b98462872 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kclient
import (
"context"
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/util/ptr"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/features"
dubbogvr "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvr"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/kubeclient"
types "github.com/apache/dubbo-kubernetes/pkg/config/schema/kubetypes"
"github.com/apache/dubbo-kubernetes/pkg/kube"
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
"github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
"github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"sync"
dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
)
var log = dubbolog.RegisterScope("kclient", "kclient debugging")
type Filter = kubetypes.Filter
type handlerRegistration struct {
registration cache.ResourceEventHandlerRegistration
handler cache.ResourceEventHandler
}
type informerClient[T controllers.Object] struct {
informer cache.SharedIndexInformer
startInformer func(stopCh <-chan struct{})
filter func(t any) bool
handlerMu sync.RWMutex
registeredHandlers []handlerRegistration
}
type fullClient[T controllers.Object] struct {
writeClient[T]
Informer[T]
}
type writeClient[T controllers.Object] struct {
client kube.Client
}
type internalIndex struct {
key string
indexer cache.Indexer
filter func(t any) bool
}
type neverReady struct{}
func New[T controllers.ComparableObject](c kube.Client) Client[T] {
return NewFiltered[T](c, Filter{})
}
func ToOpts(c kube.Client, gvr schema.GroupVersionResource, filter Filter) kubetypes.InformerOptions {
ns := filter.Namespace
if !dubbogvr.IsClusterScoped(gvr) && ns == "" {
ns = features.InformerWatchNamespace
}
return kubetypes.InformerOptions{
LabelSelector: filter.LabelSelector,
FieldSelector: filter.FieldSelector,
Namespace: ns,
ObjectTransform: filter.ObjectTransform,
Cluster: c.ClusterID(),
}
}
func NewMetadata(c kube.Client, gvr schema.GroupVersionResource, filter Filter) Informer[*metav1.PartialObjectMetadata] {
opts := ToOpts(c, gvr, filter)
opts.InformerType = kubetypes.MetadataInformer
inf := kubeclient.GetInformerFilteredFromGVR(c, opts, gvr)
return newInformerClient[*metav1.PartialObjectMetadata](gvr, inf, filter)
}
func NewUntypedInformer(c kube.Client, gvr schema.GroupVersionResource, filter Filter) Untyped {
inf := kubeclient.GetInformerFilteredFromGVR(c, ToOpts(c, gvr, filter), gvr)
return newInformerClient[controllers.Object](gvr, inf, filter)
}
func NewDelayedInformer[T controllers.ComparableObject](c kube.Client, gvr schema.GroupVersionResource, informerType kubetypes.InformerType, filter Filter) Informer[T] {
watcher := c.CrdWatcher()
if watcher == nil {
log.Info("NewDelayedInformer called without a CrdWatcher enabled")
}
delay := newDelayedFilter(gvr, watcher)
inf := func() informerfactory.StartableInformer {
opts := ToOpts(c, gvr, filter)
opts.InformerType = informerType
return kubeclient.GetInformerFiltered[T](c, opts, gvr)
}
return newDelayedInformer[T](gvr, inf, delay, filter)
}
func NewFiltered[T controllers.ComparableObject](c kube.Client, filter Filter) Client[T] {
gvr := types.MustToGVR[T](types.MustGVKFromType[T]())
inf := kubeclient.GetInformerFiltered[T](c, ToOpts(c, gvr, filter), gvr)
return &fullClient[T]{
writeClient: writeClient[T]{client: c},
Informer: newInformerClient[T](gvr, inf, filter),
}
}
func newInformerClient[T controllers.ComparableObject](gvr schema.GroupVersionResource, inf informerfactory.StartableInformer, filter Filter) Informer[T] {
ic := &informerClient[T]{
informer: inf.Informer,
startInformer: inf.Start,
}
if filter.ObjectFilter != nil {
applyDynamicFilter(filter, gvr, ic)
}
return ic
}
func applyDynamicFilter[T controllers.ComparableObject](filter Filter, gvr schema.GroupVersionResource, ic *informerClient[T]) {
if filter.ObjectFilter != nil {
ic.filter = filter.ObjectFilter.Filter
filter.ObjectFilter.AddHandler(func(added, removed sets.String) {
ic.handlerMu.RLock()
defer ic.handlerMu.RUnlock()
log.Infof("applyDynamicFilter: namespace filter handler triggered for %v: added=%v, removed=%v", gvr, added, removed)
if gvr == dubbogvr.Namespace {
for _, item := range ic.ListUnfiltered(metav1.NamespaceAll, klabels.Everything()) {
if !added.Contains(item.GetName()) {
continue
}
log.Infof("applyDynamicFilter: triggering OnAdd for namespace %s", item.GetName())
for _, c := range ic.registeredHandlers {
c.handler.OnAdd(item, false)
}
}
} else {
for ns := range added {
log.Infof("applyDynamicFilter: namespace %s added, listing unfiltered objects for %v", ns, gvr)
items := ic.ListUnfiltered(ns, klabels.Everything())
log.Infof("applyDynamicFilter: found %d unfiltered objects in namespace %s for %v", len(items), ns, gvr)
for _, item := range items {
log.Infof("applyDynamicFilter: triggering OnAdd for %s/%s in namespace %s", item.GetNamespace(), item.GetName(), ns)
for _, c := range ic.registeredHandlers {
c.handler.OnAdd(item, false)
}
}
}
for ns := range removed {
log.Infof("applyDynamicFilter: namespace %s removed, listing unfiltered objects for %v", ns, gvr)
items := ic.ListUnfiltered(ns, klabels.Everything())
log.Infof("applyDynamicFilter: found %d unfiltered objects in namespace %s for %v", len(items), ns, gvr)
for _, item := range items {
log.Infof("applyDynamicFilter: triggering OnDelete for %s/%s in namespace %s", item.GetNamespace(), item.GetName(), ns)
for _, c := range ic.registeredHandlers {
c.handler.OnDelete(item)
}
}
}
}
})
}
}
func (n *informerClient[T]) List(namespace string, selector klabels.Selector) []T {
var res []T
var filteredCount int
var totalCount int
err := cache.ListAllByNamespace(n.informer.GetIndexer(), namespace, selector, func(i any) {
totalCount++
cast := i.(T)
if n.applyFilter(cast) {
res = append(res, cast)
} else {
filteredCount++
// Log filtered objects to help diagnose
if objWithNs, ok := any(cast).(interface {
GetNamespace() string
GetName() string
}); ok {
log.Debugf("informerClient.List: filtered out object %s/%s for namespace=%s", objWithNs.GetNamespace(), objWithNs.GetName(), namespace)
}
}
})
if err != nil {
log.Warnf("informerClient.List: lister returned err for namespace=%s: %v", namespace, err)
}
if namespace == metav1.NamespaceAll {
log.Infof("informerClient.List: namespace=%s, total=%d, filtered=%d, result=%d", namespace, totalCount, filteredCount, len(res))
} else if filteredCount > 0 {
log.Debugf("informerClient.List: filtered out %d items for namespace=%s (total=%d, result=%d)", filteredCount, namespace, totalCount, len(res))
}
return res
}
func (n *informerClient[T]) ListUnfiltered(namespace string, selector klabels.Selector) []T {
var res []T
err := cache.ListAllByNamespace(n.informer.GetIndexer(), namespace, selector, func(i any) {
cast := i.(T)
res = append(res, cast)
})
if err != nil {
log.Warnf("informerClient.ListUnfiltered: lister returned err for namespace=%s: %v", namespace, err)
}
if namespace == metav1.NamespaceAll {
log.Infof("informerClient.ListUnfiltered: found %d unfiltered objects for namespace=%s (synced=%v)", len(res), namespace, n.informer.HasSynced())
if len(res) > 0 {
for i, obj := range res {
if objWithNs, ok := any(obj).(interface {
GetNamespace() string
GetName() string
}); ok {
log.Infof("informerClient.ListUnfiltered: object[%d] %s/%s", i, objWithNs.GetNamespace(), objWithNs.GetName())
}
}
}
}
return res
}
func (n *informerClient[T]) Start(stopCh <-chan struct{}) {
n.startInformer(stopCh)
}
func (n *informerClient[T]) Get(name, namespace string) T {
obj, exists, err := n.informer.GetIndexer().GetByKey(keyFunc(name, namespace))
if err != nil {
return ptr.Empty[T]()
}
if !exists {
return ptr.Empty[T]()
}
cast := obj.(T)
if !n.applyFilter(cast) {
return ptr.Empty[T]()
}
return cast
}
func (n *informerClient[T]) HasSynced() bool {
if !n.informer.HasSynced() {
return false
}
n.handlerMu.RLock()
defer n.handlerMu.RUnlock()
for _, g := range n.registeredHandlers {
if !g.registration.HasSynced() {
return false
}
}
return true
}
func (n *informerClient[T]) HasSyncedIgnoringHandlers() bool {
return n.informer.HasSynced()
}
func (n *informerClient[T]) ShutdownHandlers() {
n.handlerMu.Lock()
defer n.handlerMu.Unlock()
for _, c := range n.registeredHandlers {
_ = n.informer.RemoveEventHandler(c.registration)
}
n.registeredHandlers = nil
}
func (n *informerClient[T]) AddEventHandler(h cache.ResourceEventHandler) cache.ResourceEventHandlerRegistration {
fh := cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
var nameStr, nsStr string
if objWithNs, ok := any(obj).(interface {
GetNamespace() string
GetName() string
}); ok {
nsStr = objWithNs.GetNamespace()
nameStr = objWithNs.GetName()
}
if n.filter == nil {
log.Debugf("informerClient.AddEventHandler: FilterFunc allowing object %s/%s (no filter)", nsStr, nameStr)
return true
}
cast := obj.(T)
allowed := n.filter(cast)
if !allowed {
// Log when objects are filtered out to help diagnose missing events
log.Infof("informerClient.AddEventHandler: FilterFunc filtered out object %s/%s", nsStr, nameStr)
} else {
log.Debugf("informerClient.AddEventHandler: FilterFunc allowing object %s/%s", nsStr, nameStr)
}
return allowed
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
var nameStr, nsStr string
if objWithNs, ok := any(obj).(interface {
GetNamespace() string
GetName() string
}); ok {
nsStr = objWithNs.GetNamespace()
nameStr = objWithNs.GetName()
}
log.Infof("informerClient.AddEventHandler: OnAdd called for %s/%s", nsStr, nameStr)
h.OnAdd(obj, false)
},
UpdateFunc: func(oldObj, newObj interface{}) {
var nameStr, nsStr string
if objWithNs, ok := any(newObj).(interface {
GetNamespace() string
GetName() string
}); ok {
nsStr = objWithNs.GetNamespace()
nameStr = objWithNs.GetName()
}
log.Infof("informerClient.AddEventHandler: OnUpdate called for %s/%s", nsStr, nameStr)
h.OnUpdate(oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
var nameStr, nsStr string
if objWithNs, ok := any(obj).(interface {
GetNamespace() string
GetName() string
}); ok {
nsStr = objWithNs.GetNamespace()
nameStr = objWithNs.GetName()
}
log.Infof("informerClient.AddEventHandler: OnDelete called for %s/%s", nsStr, nameStr)
h.OnDelete(obj)
},
},
}
n.handlerMu.Lock()
defer n.handlerMu.Unlock()
reg, err := n.informer.AddEventHandler(fh)
if err != nil {
return neverReady{}
}
n.registeredHandlers = append(n.registeredHandlers, handlerRegistration{registration: reg, handler: h})
return reg
}
func (n *informerClient[T]) Index(name string, extract func(o T) []string) RawIndexer {
if _, ok := n.informer.GetIndexer().GetIndexers()[name]; !ok {
if err := n.informer.AddIndexers(map[string]cache.IndexFunc{
name: func(obj any) ([]string, error) {
t := controllers.Extract[T](obj)
return extract(t), nil
},
}); err != nil {
}
}
ret := internalIndex{
key: name,
indexer: n.informer.GetIndexer(),
filter: n.filter,
}
return ret
}
func (n *informerClient[T]) ShutdownHandler(registration cache.ResourceEventHandlerRegistration) {
n.handlerMu.Lock()
defer n.handlerMu.Unlock()
n.registeredHandlers = slices.FilterInPlace(n.registeredHandlers, func(h handlerRegistration) bool {
return h.registration != registration
})
_ = n.informer.RemoveEventHandler(registration)
}
func (n *informerClient[T]) applyFilter(t T) bool {
if n.filter == nil {
return true
}
return n.filter(t)
}
func (n *writeClient[T]) Create(object T) (T, error) {
api := kubeclient.GetWriteClient[T](n.client, object.GetNamespace())
return api.Create(context.Background(), object, metav1.CreateOptions{})
}
func (n *writeClient[T]) Update(object T) (T, error) {
api := kubeclient.GetWriteClient[T](n.client, object.GetNamespace())
return api.Update(context.Background(), object, metav1.UpdateOptions{})
}
func (n *writeClient[T]) Patch(name, namespace string, pt apitypes.PatchType, data []byte) (T, error) {
api := kubeclient.GetWriteClient[T](n.client, namespace)
return api.Patch(context.Background(), name, pt, data, metav1.PatchOptions{})
}
func (n *writeClient[T]) PatchStatus(name, namespace string, pt apitypes.PatchType, data []byte) (T, error) {
api := kubeclient.GetWriteClient[T](n.client, namespace)
return api.Patch(context.Background(), name, pt, data, metav1.PatchOptions{}, "status")
}
func (n *writeClient[T]) ApplyStatus(name, namespace string, pt apitypes.PatchType, data []byte, fieldManager string) (T, error) {
api := kubeclient.GetWriteClient[T](n.client, namespace)
return api.Patch(context.Background(), name, pt, data, metav1.PatchOptions{
Force: ptr.Of(true),
FieldManager: fieldManager,
}, "status")
}
func (n *writeClient[T]) UpdateStatus(object T) (T, error) {
api, ok := kubeclient.GetWriteClient[T](n.client, object.GetNamespace()).(kubetypes.WriteStatusAPI[T])
if !ok {
return ptr.Empty[T](), fmt.Errorf("%T does not support UpdateStatus", object)
}
return api.UpdateStatus(context.Background(), object, metav1.UpdateOptions{})
}
func (n *writeClient[T]) Delete(name, namespace string) error {
api := kubeclient.GetWriteClient[T](n.client, namespace)
return api.Delete(context.Background(), name, metav1.DeleteOptions{})
}
func (a neverReady) HasSynced() bool {
return false
}
func (i internalIndex) Lookup(key string) []any {
res, err := i.indexer.ByIndex(i.key, key)
if err != nil {
}
if i.filter != nil {
return slices.FilterInPlace(res, i.filter)
}
return res
}
func keyFunc(name, namespace string) string {
if len(namespace) == 0 {
return name
}
return namespace + "/" + name
}