fix krt and xds api (#822)
* fix krt and xds api
* fix ci
* fix ci
diff --git a/dubbod/planet/pkg/bootstrap/server.go b/dubbod/planet/pkg/bootstrap/server.go
index e297679..aa363d4 100644
--- a/dubbod/planet/pkg/bootstrap/server.go
+++ b/dubbod/planet/pkg/bootstrap/server.go
@@ -47,6 +47,7 @@
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
"github.com/apache/dubbo-kubernetes/pkg/ctrlz"
"github.com/apache/dubbo-kubernetes/pkg/filewatcher"
"github.com/apache/dubbo-kubernetes/pkg/h2c"
@@ -252,8 +253,7 @@
}
}
- s.initRegistryEventHandlers()
-
+ // Note: initRegistryEventHandlers is called in Start() after config controller starts
s.initDiscoveryService()
s.startCA(caOpts)
@@ -284,6 +284,10 @@
s.XDSServer.CachesSynced()
+ // Register event handlers after config controller has started and synced
+ // This ensures that config changes are properly detected and handled
+ s.initRegistryEventHandlers()
+
if s.secureGrpcAddress != "" {
grpcListener, err := net.Listen("tcp", s.secureGrpcAddress)
if err != nil {
@@ -424,13 +428,84 @@
func (s *Server) initRegistryEventHandlers() {
log.Info("initializing registry event handlers")
- if s.configController != nil {
- configHandler := func(prev config.Config, curr config.Config, event model.Event) {}
- schemas := collections.Planet.All()
- for _, schema := range schemas {
- s.configController.RegisterEventHandler(schema.GroupVersionKind(), configHandler)
- }
+ if s.configController == nil {
+ log.Warnf("initRegistryEventHandlers: configController is nil, cannot register event handlers")
+ return
}
+
+ log.Infof("initRegistryEventHandlers: configController is available, registering event handlers")
+
+ configHandler := func(prev config.Config, curr config.Config, event model.Event) {
+ // Log ALL events at INFO level to ensure visibility
+ log.Infof("configHandler: received event %s for config %v (prev.Name=%s, curr.Name=%s, prev.Namespace=%s, curr.Namespace=%s)",
+ event, curr.GroupVersionKind, prev.Name, curr.Name, prev.Namespace, curr.Namespace)
+
+ // Handle delete events - use prev config if curr is empty
+ cfg := curr
+ if event == model.EventDelete && curr.Name == "" {
+ cfg = prev
+ }
+
+ // Build ConfigKey for the changed config
+ // Find the schema to get the kind.Kind
+ schema, found := collections.Planet.FindByGroupVersionKind(cfg.GroupVersionKind)
+ if !found {
+ log.Warnf("configHandler: schema not found for %v, skipping", cfg.GroupVersionKind)
+ return
+ }
+
+ // Map GVK to kind.Kind using schema identifier
+ // This matches Istio's approach of using gvk.MustToKind, but we use schema.Identifier() instead
+ schemaID := schema.Identifier()
+ log.Infof("configHandler: processing config change, schema identifier=%s, GVK=%v, name=%s/%s, event=%s",
+ schemaID, cfg.GroupVersionKind, cfg.Namespace, cfg.Name, event)
+
+ var configKind kind.Kind
+ switch schemaID {
+ case "SubsetRule":
+ configKind = kind.SubsetRule
+ case "serviceRoute", "ServiceRoute":
+ configKind = kind.ServiceRoute
+ case "PeerAuthentication":
+ configKind = kind.PeerAuthentication
+ default:
+ log.Debugf("configHandler: unknown schema identifier %s for %v, skipping", schemaID, cfg.GroupVersionKind)
+ return
+ }
+
+ configKey := model.ConfigKey{
+ Kind: configKind,
+ Name: cfg.Name,
+ Namespace: cfg.Namespace,
+ }
+
+ // Log the config change
+ log.Infof("configHandler: %s event for %s/%s/%s", event, configKey.Kind, configKey.Namespace, configKey.Name)
+
+ // CRITICAL: For SubsetRule and ServiceRoute changes, we need Full push to ensure
+ // PushContext is re-initialized and configuration is reloaded
+ // This is because these configs affect CDS/RDS generation and need complete context refresh
+ needsFullPush := configKind == kind.SubsetRule || configKind == kind.ServiceRoute
+
+ // Trigger ConfigUpdate to push changes to all connected proxies
+ s.XDSServer.ConfigUpdate(&model.PushRequest{
+ ConfigsUpdated: sets.New(configKey),
+ Reason: model.NewReasonStats(model.DependentResource),
+ Full: needsFullPush, // Full push for SubsetRule/ServiceRoute to reload PushContext
+ })
+ }
+ schemas := collections.Planet.All()
+ log.Infof("initRegistryEventHandlers: found %d schemas to register", len(schemas))
+ registeredCount := 0
+ for _, schema := range schemas {
+ gvk := schema.GroupVersionKind()
+ schemaID := schema.Identifier()
+ log.Infof("initRegistryEventHandlers: registering event handler for %s (GVK: %v)", schemaID, gvk)
+ s.configController.RegisterEventHandler(gvk, configHandler)
+ registeredCount++
+ log.Infof("initRegistryEventHandlers: successfully registered event handler for %s (GVK: %v)", schemaID, gvk)
+ }
+ log.Infof("initRegistryEventHandlers: successfully registered event handlers for %d schemas", registeredCount)
}
func (s *Server) addReadinessProbe(name string, fn readinessProbe) {
diff --git a/dubbod/planet/pkg/bootstrap/validation.go b/dubbod/planet/pkg/bootstrap/validation.go
index 6c53b01..05a7df9 100644
--- a/dubbod/planet/pkg/bootstrap/validation.go
+++ b/dubbod/planet/pkg/bootstrap/validation.go
@@ -19,6 +19,7 @@
import (
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/features"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
"github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/pkg/webhooks/server"
"github.com/apache/dubbo-kubernetes/pkg/webhooks/validation/controller"
@@ -30,6 +31,7 @@
}
log.Info("initializing config validator")
params := server.Options{
+ Schemas: collections.Planet,
DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
Mux: s.httpsMux,
}
diff --git a/dubbod/planet/pkg/config/aggregate/config.go b/dubbod/planet/pkg/config/aggregate/config.go
index cc64232..f90702a 100644
--- a/dubbod/planet/pkg/config/aggregate/config.go
+++ b/dubbod/planet/pkg/config/aggregate/config.go
@@ -28,6 +28,7 @@
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
)
@@ -158,11 +159,19 @@
}
func (cr *storeCache) RegisterEventHandler(kind config.GroupVersionKind, handler model.EventHandler) {
- for _, cache := range cr.caches {
+ log := log.RegisterScope("aggregate", "aggregate config controller")
+ log.Infof("RegisterEventHandler: registering handler for %v across %d caches", kind, len(cr.caches))
+ registeredCount := 0
+ for i, cache := range cr.caches {
if _, exists := cache.Schemas().FindByGroupVersionKind(kind); exists {
+ log.Infof("RegisterEventHandler: registering handler for %v on cache[%d] (type=%T)", kind, i, cache)
cache.RegisterEventHandler(kind, handler)
+ registeredCount++
+ } else {
+ log.Debugf("RegisterEventHandler: cache[%d] does not support %v, skipping", i, kind)
}
}
+ log.Infof("RegisterEventHandler: successfully registered handler for %v on %d caches", kind, registeredCount)
}
func (cr *storeCache) Run(stop <-chan struct{}) {
diff --git a/dubbod/planet/pkg/config/kube/crdclient/client.go b/dubbod/planet/pkg/config/kube/crdclient/client.go
index c251efa..e4cab97 100644
--- a/dubbod/planet/pkg/config/kube/crdclient/client.go
+++ b/dubbod/planet/pkg/config/kube/crdclient/client.go
@@ -26,6 +26,7 @@
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
"github.com/apache/dubbo-kubernetes/pkg/kube"
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
@@ -152,19 +153,20 @@
}
func (cl *Client) addCRD(name string, opts krt.OptionsBuilder) {
- cl.logger.Debugf("adding CRD %q", name)
+ cl.logger.Infof("addCRD: adding CRD %q", name)
s, f := cl.schemasByCRDName[name]
if !f {
- cl.logger.Debugf("added resource that we are not watching: %v", name)
+ cl.logger.Warnf("addCRD: added resource that we are not watching: %v", name)
return
}
resourceGVK := s.GroupVersionKind()
gvr := s.GroupVersionResource()
+ cl.logger.Infof("addCRD: CRD %q maps to GVK %v, GVR %v", name, resourceGVK, gvr)
cl.kindsMu.Lock()
defer cl.kindsMu.Unlock()
if _, f := cl.kinds[resourceGVK]; f {
- cl.logger.Debugf("added resource that already exists: %v", resourceGVK)
+ cl.logger.Warnf("addCRD: added resource that already exists: %v", resourceGVK)
return
}
translateFunc, f := translationMap[resourceGVK]
@@ -189,6 +191,9 @@
var namespaceFilter kubetypes.DynamicObjectFilter
if !s.IsClusterScoped() {
namespaceFilter = cl.client.ObjectFilter()
+ cl.logger.Infof("addCRD: using namespace filter for %v (not cluster-scoped)", resourceGVK)
+ } else {
+ cl.logger.Infof("addCRD: no namespace filter for %v (cluster-scoped)", resourceGVK)
}
filter := kubetypes.Filter{
@@ -196,15 +201,23 @@
ObjectTransform: transform,
FieldSelector: fieldSelector,
}
+ cl.logger.Infof("addCRD: created filter for %v (namespaceFilter=%v, extraFilter=%v, fieldSelector=%v)", resourceGVK, namespaceFilter != nil, extraFilter != nil, fieldSelector)
var kc kclient.Untyped
if s.IsBuiltin() {
kc = kclient.NewUntypedInformer(cl.client, gvr, filter)
} else {
+ // For SubsetRule and ServiceRoute, we use Dynamic client which returns unstructured objects
+ // So we need to use DynamicInformer type to ensure the informer expects unstructured objects
+ informerType := kubetypes.StandardInformer
+ if resourceGVK == gvk.SubsetRule || resourceGVK == gvk.ServiceRoute || resourceGVK == gvk.PeerAuthentication {
+ informerType = kubetypes.DynamicInformer
+ cl.logger.Infof("addCRD: using DynamicInformer for %v (uses Dynamic client)", resourceGVK)
+ }
kc = kclient.NewDelayedInformer[controllers.Object](
cl.client,
gvr,
- kubetypes.StandardInformer,
+ informerType,
filter,
)
}
@@ -213,15 +226,58 @@
collection := krt.MapCollection(wrappedClient, func(obj controllers.Object) config.Config {
cfg := translateFunc(obj)
cfg.Domain = cl.domainSuffix
+ // Only log at Debug level to avoid spam, but keep it available for diagnosis
+ cl.logger.Debugf("addCRD: MapCollection translating object %s/%s to config for %v", obj.GetNamespace(), obj.GetName(), resourceGVK)
return cfg
}, opts.WithName("collection/"+resourceGVK.Kind)...)
index := krt.NewNamespaceIndex(collection)
+ // Register a debug handler to track all events from the wrappedClient (before MapCollection)
+ // This helps diagnose if events are being filtered before reaching the collection
+ wrappedClientDebugHandler := wrappedClient.RegisterBatch(func(o []krt.Event[controllers.Object]) {
+ if len(o) > 0 {
+ cl.logger.Infof("addCRD: wrappedClient event detected for %v: %d events", resourceGVK, len(o))
+ for i, event := range o {
+ var nameStr, nsStr string
+ if event.New != nil {
+ obj := *event.New
+ nameStr = obj.GetName()
+ nsStr = obj.GetNamespace()
+ } else if event.Old != nil {
+ obj := *event.Old
+ nameStr = obj.GetName()
+ nsStr = obj.GetNamespace()
+ }
+ cl.logger.Infof("addCRD: wrappedClient event[%d] %s for %v (name=%s/%s)",
+ i, event.Event, resourceGVK, nsStr, nameStr)
+ }
+ }
+ }, false)
+ // Register a debug handler to track all events from the collection
+ // This helps diagnose why new config changes might not trigger events
+ // Use false to match Istio's implementation - only process future events, not initial sync
+ debugHandler := collection.RegisterBatch(func(o []krt.Event[config.Config]) {
+ if len(o) > 0 {
+ cl.logger.Infof("addCRD: collection event detected for %v: %d events", resourceGVK, len(o))
+ for i, event := range o {
+ var nameStr, nsStr string
+ if event.New != nil {
+ nameStr = event.New.Name
+ nsStr = event.New.Namespace
+ } else if event.Old != nil {
+ nameStr = event.Old.Name
+ nsStr = event.Old.Namespace
+ }
+ cl.logger.Infof("addCRD: collection event[%d] %s for %v (name=%s/%s)",
+ i, event.Event, resourceGVK, nsStr, nameStr)
+ }
+ }
+ }, false)
cl.kinds[resourceGVK] = nsStore{
collection: collection,
index: index,
handlers: []krt.HandlerRegistration{
- collection.RegisterBatch(func(o []krt.Event[config.Config]) {
- }, false),
+ wrappedClientDebugHandler,
+ debugHandler,
},
}
}
@@ -238,23 +294,59 @@
}
func (cl *Client) RegisterEventHandler(kind config.GroupVersionKind, handler model.EventHandler) {
- if c, ok := cl.kind(kind); ok {
- c.handlers = append(c.handlers, c.collection.RegisterBatch(func(o []krt.Event[config.Config]) {
- for _, event := range o {
- switch event.Event {
- case controllers.EventAdd:
- handler(config.Config{}, *event.New, model.Event(event.Event))
- case controllers.EventUpdate:
- handler(*event.Old, *event.New, model.Event(event.Event))
- case controllers.EventDelete:
- handler(config.Config{}, *event.Old, model.Event(event.Event))
- }
- }
- }, false))
+ cl.kindsMu.Lock()
+ defer cl.kindsMu.Unlock()
+
+ c, ok := cl.kinds[kind]
+ if !ok {
+ cl.logger.Warnf("unknown type: %s", kind)
return
}
- cl.logger.Warnf("unknown type: %s", kind)
+ cl.logger.Infof("RegisterEventHandler: registering handler for %v", kind)
+ // Match Istio's implementation: RegisterBatch returns a HandlerRegistration that is already
+ // registered with the collection, so we just need to append it to handlers to keep a reference
+ // The handler will be called by the collection when events occur, regardless of whether we
+ // update cl.kinds[kind] or not. However, we update it to keep the handlers slice in sync.
+ handlerReg := c.collection.RegisterBatch(func(o []krt.Event[config.Config]) {
+ cl.logger.Infof("RegisterEventHandler: batch handler triggered for %v with %d events", kind, len(o))
+ for i, event := range o {
+ var nameStr, nsStr string
+ if event.New != nil {
+ nameStr = event.New.Name
+ nsStr = event.New.Namespace
+ } else if event.Old != nil {
+ nameStr = event.Old.Name
+ nsStr = event.Old.Namespace
+ }
+ cl.logger.Infof("RegisterEventHandler: processing event[%d] %s for %v (name=%s/%s)",
+ i, event.Event, kind, nsStr, nameStr)
+ switch event.Event {
+ case controllers.EventAdd:
+ if event.New != nil {
+ handler(config.Config{}, *event.New, model.Event(event.Event))
+ } else {
+ cl.logger.Warnf("RegisterEventHandler: EventAdd but event.New is nil, skipping")
+ }
+ case controllers.EventUpdate:
+ if event.Old != nil && event.New != nil {
+ handler(*event.Old, *event.New, model.Event(event.Event))
+ } else {
+ cl.logger.Warnf("RegisterEventHandler: EventUpdate but event.Old or event.New is nil, skipping")
+ }
+ case controllers.EventDelete:
+ if event.Old != nil {
+ handler(config.Config{}, *event.Old, model.Event(event.Event))
+ } else {
+ cl.logger.Warnf("RegisterEventHandler: EventDelete but event.Old is nil, skipping")
+ }
+ }
+ }
+ }, false)
+ // Update handlers slice to keep reference (though not strictly necessary for functionality)
+ c.handlers = append(c.handlers, handlerReg)
+ cl.kinds[kind] = c
+ cl.logger.Infof("RegisterEventHandler: successfully registered handler for %v", kind)
}
func (cl *Client) Get(typ config.GroupVersionKind, name, namespace string) *config.Config {
@@ -333,14 +425,43 @@
func (cl *Client) List(kind config.GroupVersionKind, namespace string) []config.Config {
h, f := cl.kind(kind)
if !f {
+ cl.logger.Warnf("List: unknown kind %v", kind)
return nil
}
- if namespace == metav1.NamespaceAll {
- return h.collection.List()
+ // Check if collection is synced
+ if !h.collection.HasSynced() {
+ cl.logger.Warnf("List: collection for %v is not synced yet", kind)
}
- return h.index.Lookup(namespace)
+ var configs []config.Config
+ if namespace == metav1.NamespaceAll {
+ // Get all configs from collection
+ configs = h.collection.List()
+ cl.logger.Infof("List: found %d configs for %v (namespace=all, synced=%v)",
+ len(configs), kind, h.collection.HasSynced())
+ if len(configs) > 0 {
+ for i, cfg := range configs {
+ cl.logger.Infof("List: config[%d] %s/%s for %v", i, cfg.Namespace, cfg.Name, kind)
+ }
+ } else {
+ cl.logger.Warnf("List: collection returned 0 configs for %v (synced=%v), this may indicate informer is not watching correctly or resources are being filtered", kind, h.collection.HasSynced())
+ }
+ // Log collection type for diagnosis
+ cl.logger.Infof("List: collection type is %T, HasSynced=%v", h.collection, h.collection.HasSynced())
+ } else {
+ configs = h.index.Lookup(namespace)
+ cl.logger.Infof("List: found %d configs for %v in namespace %s (synced=%v)", len(configs), kind, namespace, h.collection.HasSynced())
+ if len(configs) > 0 {
+ for i, cfg := range configs {
+ cl.logger.Infof("List: config[%d] %s/%s for %v", i, cfg.Namespace, cfg.Name, kind)
+ }
+ } else {
+ cl.logger.Warnf("List: found 0 configs for %v in namespace %s (synced=%v), checking if resources exist in cluster", kind, namespace, h.collection.HasSynced())
+ }
+ }
+
+ return configs
}
func getObjectMetadata(config config.Config) metav1.ObjectMeta {
diff --git a/dubbod/planet/pkg/config/kube/crdclient/types.go b/dubbod/planet/pkg/config/kube/crdclient/types.go
index 9e70c24..c5d62fa 100644
--- a/dubbod/planet/pkg/config/kube/crdclient/types.go
+++ b/dubbod/planet/pkg/config/kube/crdclient/types.go
@@ -37,7 +37,9 @@
k8sioapidiscoveryv1 "k8s.io/api/discovery/v1"
k8sioapiextensionsapiserverpkgapisapiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
)
@@ -53,13 +55,30 @@
func create(c kube.Client, cfg config.Config, objMeta metav1.ObjectMeta) (metav1.Object, error) {
switch cfg.GroupVersionKind {
case gvk.SubsetRule:
+ // SubsetRule uses networking.dubbo.apache.org API group, not networking.istio.io
+ // Use Dynamic client to access it, but reuse Istio's DestinationRule spec structure
spec := cfg.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)
clonedSpec := protomarshal.Clone(spec)
obj := &apiistioioapinetworkingv1.DestinationRule{
ObjectMeta: objMeta,
}
assignSpec(&obj.Spec, clonedSpec)
- return c.Dubbo().NetworkingV1().DestinationRules(cfg.Namespace).Create(context.TODO(), obj, metav1.CreateOptions{})
+ // Convert to unstructured for Dynamic client
+ uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert DestinationRule to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Kind: "SubsetRule",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "subsetrules",
+ }).Namespace(cfg.Namespace).Create(context.TODO(), u, metav1.CreateOptions{})
case gvk.PeerAuthentication:
spec := cfg.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)
clonedSpec := protomarshal.Clone(spec)
@@ -67,15 +86,46 @@
ObjectMeta: objMeta,
}
assignSpec(&obj.Spec, clonedSpec)
- return c.Dubbo().SecurityV1().PeerAuthentications(cfg.Namespace).Create(context.TODO(), obj, metav1.CreateOptions{})
+ uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert PeerAuthentication to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Kind: "PeerAuthentication",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Resource: "peerauthentications",
+ }).Namespace(cfg.Namespace).Create(context.TODO(), u, metav1.CreateOptions{})
case gvk.ServiceRoute:
+ // ServiceRoute uses networking.dubbo.apache.org API group, not networking.istio.io
+ // Use Dynamic client to access it, but reuse Istio's VirtualService spec structure
spec := cfg.Spec.(*istioioapinetworkingv1alpha3.VirtualService)
clonedSpec := protomarshal.Clone(spec)
obj := &apiistioioapinetworkingv1.VirtualService{
ObjectMeta: objMeta,
}
assignSpec(&obj.Spec, clonedSpec)
- return c.Dubbo().NetworkingV1().VirtualServices(cfg.Namespace).Create(context.TODO(), obj, metav1.CreateOptions{})
+ // Convert to unstructured for Dynamic client
+ uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert VirtualService to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Kind: "ServiceRoute",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "serviceroutes",
+ }).Namespace(cfg.Namespace).Create(context.TODO(), u, metav1.CreateOptions{})
default:
return nil, fmt.Errorf("unsupported type: %v", cfg.GroupVersionKind)
}
@@ -84,13 +134,28 @@
func update(c kube.Client, cfg config.Config, objMeta metav1.ObjectMeta) (metav1.Object, error) {
switch cfg.GroupVersionKind {
case gvk.SubsetRule:
+ // SubsetRule uses networking.dubbo.apache.org API group, use Dynamic client
spec := cfg.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)
clonedSpec := protomarshal.Clone(spec)
obj := &apiistioioapinetworkingv1.DestinationRule{
ObjectMeta: objMeta,
}
assignSpec(&obj.Spec, clonedSpec)
- return c.Dubbo().NetworkingV1().DestinationRules(cfg.Namespace).Update(context.TODO(), obj, metav1.UpdateOptions{})
+ uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert DestinationRule to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Kind: "SubsetRule",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "subsetrules",
+ }).Namespace(cfg.Namespace).Update(context.TODO(), u, metav1.UpdateOptions{})
case gvk.PeerAuthentication:
spec := cfg.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)
clonedSpec := protomarshal.Clone(spec)
@@ -98,15 +163,44 @@
ObjectMeta: objMeta,
}
assignSpec(&obj.Spec, clonedSpec)
- return c.Dubbo().SecurityV1().PeerAuthentications(cfg.Namespace).Update(context.TODO(), obj, metav1.UpdateOptions{})
+ uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert PeerAuthentication to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Kind: "PeerAuthentication",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Resource: "peerauthentications",
+ }).Namespace(cfg.Namespace).Update(context.TODO(), u, metav1.UpdateOptions{})
case gvk.ServiceRoute:
+ // ServiceRoute uses networking.dubbo.apache.org API group, use Dynamic client
spec := cfg.Spec.(*istioioapinetworkingv1alpha3.VirtualService)
clonedSpec := protomarshal.Clone(spec)
obj := &apiistioioapinetworkingv1.VirtualService{
ObjectMeta: objMeta,
}
assignSpec(&obj.Spec, clonedSpec)
- return c.Dubbo().NetworkingV1().VirtualServices(cfg.Namespace).Update(context.TODO(), obj, metav1.UpdateOptions{})
+ uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert VirtualService to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Kind: "ServiceRoute",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "serviceroutes",
+ }).Namespace(cfg.Namespace).Update(context.TODO(), u, metav1.UpdateOptions{})
default:
return nil, fmt.Errorf("unsupported type: %v", cfg.GroupVersionKind)
}
@@ -115,13 +209,28 @@
func updateStatus(c kube.Client, cfg config.Config, objMeta metav1.ObjectMeta) (metav1.Object, error) {
switch cfg.GroupVersionKind {
case gvk.SubsetRule:
+ // SubsetRule uses networking.dubbo.apache.org API group, use Dynamic client
status := cfg.Status.(*istioioapimetav1alpha1.IstioStatus)
clonedStatus := protomarshal.Clone(status)
obj := &apiistioioapinetworkingv1.DestinationRule{
ObjectMeta: objMeta,
}
assignSpec(&obj.Status, clonedStatus)
- return c.Dubbo().NetworkingV1().DestinationRules(cfg.Namespace).UpdateStatus(context.TODO(), obj, metav1.UpdateOptions{})
+ uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert DestinationRule to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Kind: "SubsetRule",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "subsetrules",
+ }).Namespace(cfg.Namespace).UpdateStatus(context.TODO(), u, metav1.UpdateOptions{})
case gvk.PeerAuthentication:
status := cfg.Status.(*istioioapimetav1alpha1.IstioStatus)
clonedStatus := protomarshal.Clone(status)
@@ -129,15 +238,44 @@
ObjectMeta: objMeta,
}
assignSpec(&obj.Status, clonedStatus)
- return c.Dubbo().SecurityV1().PeerAuthentications(cfg.Namespace).UpdateStatus(context.TODO(), obj, metav1.UpdateOptions{})
+ uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert PeerAuthentication status to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Kind: "PeerAuthentication",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Resource: "peerauthentications",
+ }).Namespace(cfg.Namespace).UpdateStatus(context.TODO(), u, metav1.UpdateOptions{})
case gvk.ServiceRoute:
+ // ServiceRoute uses networking.dubbo.apache.org API group, use Dynamic client
status := cfg.Status.(*istioioapimetav1alpha1.IstioStatus)
clonedStatus := protomarshal.Clone(status)
obj := &apiistioioapinetworkingv1.VirtualService{
ObjectMeta: objMeta,
}
assignSpec(&obj.Status, clonedStatus)
- return c.Dubbo().NetworkingV1().VirtualServices(cfg.Namespace).UpdateStatus(context.TODO(), obj, metav1.UpdateOptions{})
+ uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert VirtualService to unstructured: %v", err)
+ }
+ u := &unstructured.Unstructured{Object: uObj}
+ u.SetGroupVersionKind(schema.GroupVersionKind{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Kind: "ServiceRoute",
+ })
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "serviceroutes",
+ }).Namespace(cfg.Namespace).UpdateStatus(context.TODO(), u, metav1.UpdateOptions{})
default:
return nil, fmt.Errorf("unsupported type: %v", cfg.GroupVersionKind)
}
@@ -149,6 +287,7 @@
}
switch orig.GroupVersionKind {
case gvk.SubsetRule:
+ // SubsetRule uses networking.dubbo.apache.org API group, use Dynamic client
origSpec := orig.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)
modSpec := mod.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)
clonedOrigSpec := protomarshal.Clone(origSpec)
@@ -165,8 +304,11 @@
if err != nil {
return nil, err
}
- return c.Dubbo().NetworkingV1().DestinationRules(orig.Namespace).
- Patch(context.TODO(), orig.Name, typ, patchBytes, metav1.PatchOptions{FieldManager: "planet-discovery"})
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "subsetrules",
+ }).Namespace(orig.Namespace).Patch(context.TODO(), orig.Name, typ, patchBytes, metav1.PatchOptions{FieldManager: "planet-discovery"})
case gvk.PeerAuthentication:
origSpec := orig.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)
modSpec := mod.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)
@@ -184,9 +326,13 @@
if err != nil {
return nil, err
}
- return c.Dubbo().SecurityV1().PeerAuthentications(orig.Namespace).
- Patch(context.TODO(), orig.Name, typ, patchBytes, metav1.PatchOptions{FieldManager: "planet-discovery"})
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Resource: "peerauthentications",
+ }).Namespace(orig.Namespace).Patch(context.TODO(), orig.Name, typ, patchBytes, metav1.PatchOptions{FieldManager: "planet-discovery"})
case gvk.ServiceRoute:
+ // ServiceRoute uses networking.dubbo.apache.org API group, use Dynamic client
origSpec := orig.Spec.(*istioioapinetworkingv1alpha3.VirtualService)
modSpec := mod.Spec.(*istioioapinetworkingv1alpha3.VirtualService)
clonedOrigSpec := protomarshal.Clone(origSpec)
@@ -203,8 +349,11 @@
if err != nil {
return nil, err
}
- return c.Dubbo().NetworkingV1().VirtualServices(orig.Namespace).
- Patch(context.TODO(), orig.Name, typ, patchBytes, metav1.PatchOptions{FieldManager: "planet-discovery"})
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "serviceroutes",
+ }).Namespace(orig.Namespace).Patch(context.TODO(), orig.Name, typ, patchBytes, metav1.PatchOptions{FieldManager: "planet-discovery"})
default:
return nil, fmt.Errorf("unsupported type: %v", orig.GroupVersionKind)
}
@@ -217,11 +366,25 @@
}
switch typ {
case gvk.SubsetRule:
- return c.Dubbo().NetworkingV1().DestinationRules(namespace).Delete(context.TODO(), name, deleteOptions)
+ // SubsetRule uses networking.dubbo.apache.org API group, use Dynamic client
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "subsetrules",
+ }).Namespace(namespace).Delete(context.TODO(), name, deleteOptions)
case gvk.PeerAuthentication:
- return c.Dubbo().SecurityV1().PeerAuthentications(namespace).Delete(context.TODO(), name, deleteOptions)
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Resource: "peerauthentications",
+ }).Namespace(namespace).Delete(context.TODO(), name, deleteOptions)
case gvk.ServiceRoute:
- return c.Dubbo().NetworkingV1().VirtualServices(namespace).Delete(context.TODO(), name, deleteOptions)
+ // ServiceRoute uses networking.dubbo.apache.org API group, use Dynamic client
+ return c.Dynamic().Resource(schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "serviceroutes",
+ }).Namespace(namespace).Delete(context.TODO(), name, deleteOptions)
default:
return fmt.Errorf("unsupported type: %v", typ)
}
@@ -301,7 +464,32 @@
}
},
gvk.SubsetRule: func(r runtime.Object) config.Config {
- obj := r.(*apiistioioapinetworkingv1.DestinationRule)
+ var obj *apiistioioapinetworkingv1.DestinationRule
+ // Handle unstructured objects from Dynamic client
+ // First try to convert from unstructured, as Dynamic client returns unstructured objects
+ // Note: r may be controllers.Object which embeds runtime.Object, so we need to check the concrete type
+ switch v := r.(type) {
+ case *unstructured.Unstructured:
+ obj = &apiistioioapinetworkingv1.DestinationRule{}
+ if err := runtime.DefaultUnstructuredConverter.FromUnstructured(v.Object, obj); err != nil {
+ panic(fmt.Sprintf("failed to convert unstructured to DestinationRule: %v", err))
+ }
+ case *apiistioioapinetworkingv1.DestinationRule:
+ // Handle typed objects from Istio client
+ obj = v
+ default:
+ // Fallback: try to convert any runtime.Object to unstructured first, then to DestinationRule
+ uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(r)
+ if err == nil {
+ u := &unstructured.Unstructured{Object: uObj}
+ obj = &apiistioioapinetworkingv1.DestinationRule{}
+ if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, obj); err != nil {
+ panic(fmt.Sprintf("failed to convert object %T to DestinationRule: %v", r, err))
+ }
+ } else {
+ panic(fmt.Sprintf("unexpected object type for SubsetRule: %T, expected *unstructured.Unstructured or *apiistioioapinetworkingv1.DestinationRule, conversion error: %v", r, err))
+ }
+ }
return config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.SubsetRule,
@@ -338,7 +526,27 @@
}
},
gvk.PeerAuthentication: func(r runtime.Object) config.Config {
- obj := r.(*apiistioioapisecurityv1.PeerAuthentication)
+ var obj *apiistioioapisecurityv1.PeerAuthentication
+ switch v := r.(type) {
+ case *unstructured.Unstructured:
+ obj = &apiistioioapisecurityv1.PeerAuthentication{}
+ if err := runtime.DefaultUnstructuredConverter.FromUnstructured(v.Object, obj); err != nil {
+ panic(fmt.Sprintf("failed to convert unstructured to PeerAuthentication: %v", err))
+ }
+ case *apiistioioapisecurityv1.PeerAuthentication:
+ obj = v
+ default:
+ uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(r)
+ if err == nil {
+ u := &unstructured.Unstructured{Object: uObj}
+ obj = &apiistioioapisecurityv1.PeerAuthentication{}
+ if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, obj); err != nil {
+ panic(fmt.Sprintf("failed to convert object %T to PeerAuthentication: %v", r, err))
+ }
+ } else {
+ panic(fmt.Sprintf("unexpected object type for PeerAuthentication: %T, conversion error: %v", r, err))
+ }
+ }
return config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.PeerAuthentication,
@@ -466,7 +674,32 @@
}
},
gvk.ServiceRoute: func(r runtime.Object) config.Config {
- obj := r.(*apiistioioapinetworkingv1.VirtualService)
+ var obj *apiistioioapinetworkingv1.VirtualService
+ // Handle unstructured objects from Dynamic client
+ // First try to convert from unstructured, as Dynamic client returns unstructured objects
+ // Note: r may be controllers.Object which embeds runtime.Object, so we need to check the concrete type
+ switch v := r.(type) {
+ case *unstructured.Unstructured:
+ obj = &apiistioioapinetworkingv1.VirtualService{}
+ if err := runtime.DefaultUnstructuredConverter.FromUnstructured(v.Object, obj); err != nil {
+ panic(fmt.Sprintf("failed to convert unstructured to VirtualService: %v", err))
+ }
+ case *apiistioioapinetworkingv1.VirtualService:
+ // Handle typed objects from Istio client
+ obj = v
+ default:
+ // Fallback: try to convert any runtime.Object to unstructured first, then to VirtualService
+ uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(r)
+ if err == nil {
+ u := &unstructured.Unstructured{Object: uObj}
+ obj = &apiistioioapinetworkingv1.VirtualService{}
+ if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, obj); err != nil {
+ panic(fmt.Sprintf("failed to convert object %T to VirtualService: %v", r, err))
+ }
+ } else {
+ panic(fmt.Sprintf("unexpected object type for ServiceRoute: %T, expected *unstructured.Unstructured or *apiistioioapinetworkingv1.VirtualService, conversion error: %v", r, err))
+ }
+ }
return config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.ServiceRoute,
diff --git a/dubbod/planet/pkg/model/push_context.go b/dubbod/planet/pkg/model/push_context.go
index fd8f1d8..7990cd3 100644
--- a/dubbod/planet/pkg/model/push_context.go
+++ b/dubbod/planet/pkg/model/push_context.go
@@ -19,12 +19,14 @@
import (
"cmp"
- "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
- networking "istio.io/api/networking/v1alpha3"
"sort"
"sync"
"time"
+ "github.com/apache/dubbo-kubernetes/pkg/config/labels"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
+ networking "istio.io/api/networking/v1alpha3"
+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/serviceregistry/provider"
"github.com/apache/dubbo-kubernetes/pkg/cluster"
"github.com/apache/dubbo-kubernetes/pkg/config"
@@ -139,6 +141,9 @@
// Map of VS hostname -> referenced hostnames
referencedDestinations map[string]sets.String
+
+ // hostToRoutes keeps the resolved VirtualServices keyed by host
+ hostToRoutes map[host.Name][]config.Config
}
type subsetRuleIndex struct {
@@ -175,6 +180,7 @@
out := serviceRouteIndex{
delegates: map[ConfigKey][]ConfigKey{},
referencedDestinations: map[string]sets.String{},
+ hostToRoutes: map[host.Name][]config.Config{},
}
return out
}
@@ -505,7 +511,10 @@
}
func (ps *PushContext) createNewContext(env *Environment) {
+ log.Infof("createNewContext: creating new PushContext (full initialization)")
ps.initServiceRegistry(env, nil)
+ ps.initServiceRoutes(env)
+ ps.initSubsetRules(env)
}
func (ps *PushContext) updateContext(env *Environment, oldPushContext *PushContext, pushReq *PushRequest) {
@@ -518,14 +527,45 @@
// Check if serviceRoutes have changed base on:
// 1. ServiceRoute updates in ConfigsUpdated
- serviceRoutesChanged := pushReq != nil && (HasConfigsOfKind(pushReq.ConfigsUpdated, kind.ServiceRoute) ||
+ // 2. Full push (Full: true) - always re-initialize on full push
+ serviceRoutesChanged := pushReq != nil && (pushReq.Full || HasConfigsOfKind(pushReq.ConfigsUpdated, kind.ServiceRoute) ||
len(pushReq.AddressesUpdated) > 0)
- // Check if serviceRoutes have changed base on:
+ if pushReq != nil {
+ serviceRouteCount := 0
+ for cfg := range pushReq.ConfigsUpdated {
+ if cfg.Kind == kind.ServiceRoute {
+ serviceRouteCount++
+ }
+ }
+ if serviceRouteCount > 0 {
+ log.Infof("updateContext: detected %d ServiceRoute config changes", serviceRouteCount)
+ }
+ }
+
+ // Check if subsetRules have changed base on:
// 1. SubsetRule updates in ConfigsUpdated
- subsetRulesChanged := pushReq != nil && (HasConfigsOfKind(pushReq.ConfigsUpdated, kind.SubsetRule) ||
+ // 2. Full push (Full: true) - always re-initialize on full push
+ subsetRulesChanged := pushReq != nil && (pushReq.Full || HasConfigsOfKind(pushReq.ConfigsUpdated, kind.SubsetRule) ||
len(pushReq.AddressesUpdated) > 0)
+ if pushReq != nil {
+ subsetRuleCount := 0
+ for cfg := range pushReq.ConfigsUpdated {
+ if cfg.Kind == kind.SubsetRule {
+ subsetRuleCount++
+ }
+ }
+ if subsetRuleCount > 0 {
+ log.Infof("updateContext: detected %d SubsetRule config changes", subsetRuleCount)
+ }
+ if pushReq.Full {
+ log.Infof("updateContext: Full push requested, will re-initialize SubsetRule and ServiceRoute indexes")
+ }
+ log.Debugf("updateContext: subsetRulesChanged=%v, serviceRoutesChanged=%v, pushReq.ConfigsUpdated size=%d, Full=%v",
+ subsetRulesChanged, serviceRoutesChanged, len(pushReq.ConfigsUpdated), pushReq != nil && pushReq.Full)
+ }
+
// Also check if the actual number of services has changed
// This handles cases where Kubernetes Services are added/removed without ServiceEntry updates
if !servicesChanged && oldPushContext != nil {
@@ -551,14 +591,18 @@
}
if serviceRoutesChanged {
+ log.Infof("updateContext: ServiceRoutes changed, re-initializing ServiceRoute index")
ps.initServiceRoutes(env)
} else {
+ log.Debugf("updateContext: ServiceRoutes unchanged, reusing old ServiceRoute index")
ps.serviceRouteIndex = oldPushContext.serviceRouteIndex
}
if subsetRulesChanged {
+ log.Infof("updateContext: SubsetRules changed, re-initializing SubsetRule index")
ps.initSubsetRules(env)
} else {
+ log.Debugf("updateContext: SubsetRules unchanged, reusing old SubsetRule index")
ps.subsetRuleIndex = oldPushContext.subsetRuleIndex
}
@@ -612,15 +656,34 @@
}
func (ps *PushContext) initServiceRoutes(env *Environment) {
+ log.Infof("initServiceRoutes: starting ServiceRoute initialization")
ps.serviceRouteIndex.referencedDestinations = map[string]sets.String{}
serviceroutes := env.List(gvk.ServiceRoute, NamespaceAll)
+ log.Infof("initServiceRoutes: found %d ServiceRoute configs", len(serviceroutes))
sroutes := make([]config.Config, len(serviceroutes))
for i, r := range serviceroutes {
sroutes[i] = resolveServiceRouteShortnames(r)
+ if vs, ok := r.Spec.(*networking.VirtualService); ok {
+ log.Infof("initServiceRoutes: ServiceRoute %s/%s with hosts %v and %d HTTP routes",
+ r.Namespace, r.Name, vs.Hosts, len(vs.Http))
+ }
}
sroutes, ps.serviceRouteIndex.delegates = mergeServiceRoutesIfNeeded(sroutes, ps.exportToDefaults.serviceRoute)
+ hostToRoutes := make(map[host.Name][]config.Config)
+ for i := range sroutes {
+ vs := sroutes[i].Spec.(*networking.VirtualService)
+ for idx, h := range vs.Hosts {
+ resolvedHost := string(ResolveShortnameToFQDN(h, sroutes[i].Meta))
+ vs.Hosts[idx] = resolvedHost
+ hostName := host.Name(resolvedHost)
+ hostToRoutes[hostName] = append(hostToRoutes[hostName], sroutes[i])
+ log.Debugf("initServiceRoutes: indexed ServiceRoute %s/%s for hostname %s", sroutes[i].Namespace, sroutes[i].Name, hostName)
+ }
+ }
+ ps.serviceRouteIndex.hostToRoutes = hostToRoutes
+ log.Infof("initServiceRoutes: indexed ServiceRoutes for %d hostnames", len(hostToRoutes))
}
// sortConfigBySelectorAndCreationTime sorts the list of config objects based on priority and creation time.
@@ -708,16 +771,42 @@
ps.subsetRuleIndex.namespaceLocal = namespaceLocalSubRules
ps.subsetRuleIndex.exportedByNamespace = exportedDestRulesByNamespace
ps.subsetRuleIndex.rootNamespaceLocal = rootNamespaceLocalDestRules
+
+ // Log indexing results
+ log.Infof("setSubsetRules: indexed %d namespaces with local rules", len(namespaceLocalSubRules))
+ for ns, rules := range namespaceLocalSubRules {
+ totalRules := 0
+ for _, ruleList := range rules.specificSubRules {
+ totalRules += len(ruleList)
+ }
+ log.Infof("setSubsetRules: namespace %s has %d DestinationRules with %d specific hostnames", ns, totalRules, len(rules.specificSubRules))
+ for hostname := range rules.specificSubRules {
+ log.Debugf("setSubsetRules: namespace %s has rules for hostname %s", ns, hostname)
+ }
+ }
+ log.Infof("setSubsetRules: indexed %d namespaces with exported rules", len(exportedDestRulesByNamespace))
+ if rootNamespaceLocalDestRules != nil {
+ totalRootRules := 0
+ for _, ruleList := range rootNamespaceLocalDestRules.specificSubRules {
+ totalRootRules += len(ruleList)
+ }
+ log.Infof("setSubsetRules: root namespace has %d DestinationRules with %d specific hostnames", totalRootRules, len(rootNamespaceLocalDestRules.specificSubRules))
+ }
}
func (ps *PushContext) initSubsetRules(env *Environment) {
configs := env.List(gvk.SubsetRule, NamespaceAll)
+ log.Infof("initSubsetRules: found %d SubsetRule configs", len(configs))
// values returned from ConfigStore.List are immutable.
// Therefore, we make a copy
subRules := make([]config.Config, len(configs))
for i := range subRules {
subRules[i] = configs[i]
+ if dr, ok := configs[i].Spec.(*networking.DestinationRule); ok {
+ log.Infof("initSubsetRules: SubsetRule %s/%s for host %s with %d subsets",
+ configs[i].Namespace, configs[i].Name, dr.Host, len(dr.Subsets))
+ }
}
ps.setSubsetRules(subRules)
@@ -749,6 +838,103 @@
}
}
+// ServiceRouteForHost returns the first ServiceRoute (VirtualService) that matches the given host.
+func (ps *PushContext) ServiceRouteForHost(hostname host.Name) *networking.VirtualService {
+ routes := ps.serviceRouteIndex.hostToRoutes[hostname]
+ if len(routes) == 0 {
+ log.Debugf("ServiceRouteForHost: no ServiceRoute found for hostname %s", hostname)
+ return nil
+ }
+ if vs, ok := routes[0].Spec.(*networking.VirtualService); ok {
+ log.Infof("ServiceRouteForHost: found ServiceRoute %s/%s for hostname %s with %d HTTP routes",
+ routes[0].Namespace, routes[0].Name, hostname, len(vs.Http))
+ return vs
+ }
+ log.Warnf("ServiceRouteForHost: ServiceRoute %s/%s for hostname %s is not a VirtualService",
+ routes[0].Namespace, routes[0].Name, hostname)
+ return nil
+}
+
+// DestinationRuleForService returns the first DestinationRule (SubsetRule) applicable to the service hostname/namespace.
+func (ps *PushContext) DestinationRuleForService(namespace string, hostname host.Name) *networking.DestinationRule {
+ log.Debugf("DestinationRuleForService: looking for DestinationRule for %s/%s", namespace, hostname)
+
+ // Check namespace-local rules first
+ if nsRules := ps.subsetRuleIndex.namespaceLocal[namespace]; nsRules != nil {
+ log.Debugf("DestinationRuleForService: checking namespace-local rules for %s (found %d specific rules)", namespace, len(nsRules.specificSubRules))
+ if dr := firstDestinationRule(nsRules, hostname); dr != nil {
+ log.Infof("DestinationRuleForService: found DestinationRule in namespace-local index for %s/%s with %d subsets", namespace, hostname, len(dr.Subsets))
+ return dr
+ }
+ } else {
+ log.Debugf("DestinationRuleForService: no namespace-local rules for namespace %s", namespace)
+ }
+
+ // Check exported rules
+ log.Debugf("DestinationRuleForService: checking exported rules (found %d exported namespaces)", len(ps.subsetRuleIndex.exportedByNamespace))
+ for ns, exported := range ps.subsetRuleIndex.exportedByNamespace {
+ if dr := firstDestinationRule(exported, hostname); dr != nil {
+ log.Infof("DestinationRuleForService: found DestinationRule in exported rules from namespace %s for %s/%s with %d subsets", ns, namespace, hostname, len(dr.Subsets))
+ return dr
+ }
+ }
+
+ // Finally, check root namespace scoped rules
+ if rootRules := ps.subsetRuleIndex.rootNamespaceLocal; rootRules != nil {
+ log.Debugf("DestinationRuleForService: checking root namespace rules (found %d specific rules)", len(rootRules.specificSubRules))
+ if dr := firstDestinationRule(rootRules, hostname); dr != nil {
+ log.Infof("DestinationRuleForService: found DestinationRule in root namespace for %s/%s with %d subsets", namespace, hostname, len(dr.Subsets))
+ return dr
+ }
+ }
+
+ log.Warnf("DestinationRuleForService: no DestinationRule found for %s/%s", namespace, hostname)
+ return nil
+}
+
+// SubsetLabelsForHost returns the label selector for a subset defined in DestinationRule.
+func (ps *PushContext) SubsetLabelsForHost(namespace string, hostname host.Name, subset string) labels.Instance {
+ if subset == "" {
+ return nil
+ }
+ rule := ps.DestinationRuleForService(namespace, hostname)
+ if rule == nil {
+ return nil
+ }
+ for _, ss := range rule.Subsets {
+ if ss.Name == subset {
+ return labels.Instance(ss.Labels)
+ }
+ }
+ return nil
+}
+
+func firstDestinationRule(csr *consolidatedSubRules, hostname host.Name) *networking.DestinationRule {
+ if csr == nil {
+ log.Debugf("firstDestinationRule: consolidatedSubRules is nil for hostname %s", hostname)
+ return nil
+ }
+ if rules := csr.specificSubRules[hostname]; len(rules) > 0 {
+ log.Debugf("firstDestinationRule: found %d rules for hostname %s", len(rules), hostname)
+ if dr, ok := rules[0].rule.Spec.(*networking.DestinationRule); ok {
+ log.Debugf("firstDestinationRule: successfully cast to DestinationRule for hostname %s", hostname)
+ return dr
+ } else {
+ log.Warnf("firstDestinationRule: failed to cast rule to DestinationRule for hostname %s", hostname)
+ }
+ } else {
+ log.Debugf("firstDestinationRule: no specific rules found for hostname %s (available hostnames: %v)", hostname, func() []string {
+ hosts := make([]string, 0, len(csr.specificSubRules))
+ for h := range csr.specificSubRules {
+ hosts = append(hosts, string(h))
+ }
+ return hosts
+ }())
+ }
+ // TODO: support wildcard hosts
+ return nil
+}
+
func (ps *PushContext) DelegateServiceRoutes(vses []config.Config) []ConfigHash {
var out []ConfigHash
for _, vs := range vses {
diff --git a/dubbod/planet/pkg/model/serviceroute.go b/dubbod/planet/pkg/model/serviceroute.go
index 46a2819..e455b7e 100644
--- a/dubbod/planet/pkg/model/serviceroute.go
+++ b/dubbod/planet/pkg/model/serviceroute.go
@@ -1,13 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package model
import (
+ "strings"
+
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
"github.com/apache/dubbo-kubernetes/pkg/config/visibility"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
+ "google.golang.org/protobuf/proto"
networking "istio.io/api/networking/v1alpha3"
"k8s.io/apimachinery/pkg/types"
- "strings"
)
func resolveServiceRouteShortnames(config config.Config) config.Config {
@@ -209,14 +228,21 @@
}
func mergeHTTPMatchRequest(root, delegate *networking.HTTPMatchRequest) *networking.HTTPMatchRequest {
- // nolint: govet
- out := *delegate
+ cloned := proto.Clone(delegate)
+ out, ok := cloned.(*networking.HTTPMatchRequest)
+ if !ok {
+ log.Warnf("mergeHTTPMatchRequest: failed to clone HTTPMatchRequest for delegate %s", delegate.GetName())
+ return nil
+ }
+ if out == nil {
+ return nil
+ }
if out.Name == "" {
out.Name = root.Name
} else if root.Name != "" {
out.Name = root.Name + "-" + out.Name
}
- return &out
+ return out
}
func hasConflict(root, leaf *networking.HTTPMatchRequest) bool {
diff --git a/dubbod/planet/pkg/model/subsetrule.go b/dubbod/planet/pkg/model/subsetrule.go
index f6dab28..954b8cd 100644
--- a/dubbod/planet/pkg/model/subsetrule.go
+++ b/dubbod/planet/pkg/model/subsetrule.go
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package model
import (
diff --git a/dubbod/planet/pkg/networking/grpcgen/cds.go b/dubbod/planet/pkg/networking/grpcgen/cds.go
index 4a3676b..3efff2b 100644
--- a/dubbod/planet/pkg/networking/grpcgen/cds.go
+++ b/dubbod/planet/pkg/networking/grpcgen/cds.go
@@ -19,6 +19,7 @@
import (
"fmt"
+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/protoconv"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
@@ -113,7 +114,8 @@
func (b *clusterBuilder) build() []*cluster.Cluster {
var defaultCluster *cluster.Cluster
- if b.filter.Contains(b.defaultClusterName) {
+ defaultRequested := b.filter == nil || b.filter.Contains(b.defaultClusterName)
+ if defaultRequested {
defaultCluster = b.edsCluster(b.defaultClusterName)
// CRITICAL: For gRPC proxyless, we need to set CommonLbConfig to handle endpoint health status
// Following Istio's implementation, we should include UNHEALTHY and DRAINING endpoints
@@ -136,6 +138,7 @@
core.HealthStatus_DEGRADED,
},
}
+ log.Infof("clusterBuilder.build: generated default cluster %s", b.defaultClusterName)
}
subsetClusters := b.applyDestinationRule(defaultCluster)
@@ -143,7 +146,10 @@
if defaultCluster != nil {
out = append(out, defaultCluster)
}
- return append(out, subsetClusters...)
+ result := append(out, subsetClusters...)
+ log.Infof("clusterBuilder.build: generated %d clusters total (1 default + %d subsets) for %s",
+ len(result), len(subsetClusters), b.defaultClusterName)
+ return result
}
func (b *clusterBuilder) edsCluster(name string) *cluster.Cluster {
@@ -167,8 +173,71 @@
func (b *clusterBuilder) applyDestinationRule(defaultCluster *cluster.Cluster) (subsetClusters []*cluster.Cluster) {
if b.svc == nil || b.port == nil {
+ log.Warnf("applyDestinationRule: service or port is nil for %s", b.defaultClusterName)
return nil
}
- // TODO
- return
+ log.Infof("applyDestinationRule: looking for DestinationRule for service %s/%s (hostname=%s, port=%d)",
+ b.svc.Attributes.Namespace, b.svc.Attributes.Name, b.hostname, b.portNum)
+ dr := b.push.DestinationRuleForService(b.svc.Attributes.Namespace, b.hostname)
+ if dr == nil {
+ log.Warnf("applyDestinationRule: no DestinationRule found for %s/%s", b.svc.Attributes.Namespace, b.hostname)
+ return nil
+ }
+ if len(dr.Subsets) == 0 {
+ log.Warnf("applyDestinationRule: DestinationRule found for %s/%s but has no subsets", b.svc.Attributes.Namespace, b.hostname)
+ return nil
+ }
+
+ log.Infof("applyDestinationRule: found DestinationRule for %s/%s with %d subsets, defaultCluster requested=%v",
+ b.svc.Attributes.Namespace, b.hostname, len(dr.Subsets), defaultCluster != nil)
+
+ var commonLbConfig *cluster.Cluster_CommonLbConfig
+ if defaultCluster != nil {
+ commonLbConfig = defaultCluster.CommonLbConfig
+ } else {
+ commonLbConfig = &cluster.Cluster_CommonLbConfig{
+ OverrideHostStatus: &core.HealthStatusSet{
+ Statuses: []core.HealthStatus{
+ core.HealthStatus_HEALTHY,
+ core.HealthStatus_UNHEALTHY,
+ core.HealthStatus_DRAINING,
+ core.HealthStatus_UNKNOWN,
+ core.HealthStatus_DEGRADED,
+ },
+ },
+ }
+ }
+
+ defaultClusterRequested := defaultCluster != nil
+ if b.filter != nil {
+ defaultClusterRequested = b.filter.Contains(b.defaultClusterName)
+ }
+
+ for _, subset := range dr.Subsets {
+ if subset == nil || subset.Name == "" {
+ continue
+ }
+ clusterName := model.BuildSubsetKey(model.TrafficDirectionOutbound, subset.Name, b.hostname, b.portNum)
+
+ // CRITICAL: Always generate subset clusters if default cluster is requested
+ // This is essential for RDS WeightedCluster to work correctly
+ shouldGenerate := true
+ if b.filter != nil && !b.filter.Contains(clusterName) {
+ // Subset cluster not explicitly requested, but generate it if default cluster was requested
+ shouldGenerate = defaultClusterRequested
+ }
+
+ if !shouldGenerate {
+ log.Debugf("applyDestinationRule: skipping subset cluster %s (not requested and default not requested)", clusterName)
+ continue
+ }
+
+ log.Infof("applyDestinationRule: generating subset cluster %s for subset %s", clusterName, subset.Name)
+ subsetCluster := b.edsCluster(clusterName)
+ subsetCluster.CommonLbConfig = commonLbConfig
+ subsetClusters = append(subsetClusters, subsetCluster)
+ }
+
+ log.Infof("applyDestinationRule: generated %d subset clusters for %s/%s", len(subsetClusters), b.svc.Attributes.Namespace, b.hostname)
+ return subsetClusters
}
diff --git a/dubbod/planet/pkg/networking/grpcgen/rds.go b/dubbod/planet/pkg/networking/grpcgen/rds.go
index 37c74b5..11dcf12 100644
--- a/dubbod/planet/pkg/networking/grpcgen/rds.go
+++ b/dubbod/planet/pkg/networking/grpcgen/rds.go
@@ -19,13 +19,17 @@
import (
"fmt"
- "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/protoconv"
- discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"strconv"
"strings"
+ "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/protoconv"
+ discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
+ "google.golang.org/protobuf/types/known/wrapperspb"
+ networking "istio.io/api/networking/v1alpha3"
)
func (g *GrpcConfigGenerator) BuildHTTPRoutes(node *model.Proxy, push *model.PushContext, routeNames []string) model.Resources {
@@ -77,28 +81,28 @@
}
domains = append(domains, "*") // Wildcard for any domain - LEAST SPECIFIC
+ outboundRoutes := []*route.Route{
+ defaultSingleClusterRoute(routeName),
+ }
+ if vs := push.ServiceRouteForHost(host.Name(hostStr)); vs != nil {
+ log.Infof("buildHTTPRoute: found ServiceRoute for host %s with %d HTTP routes", hostStr, len(vs.Http))
+ if routes := buildRoutesFromServiceRoute(vs, host.Name(hostStr), parsedPort); len(routes) > 0 {
+ log.Infof("buildHTTPRoute: built %d weighted routes from ServiceRoute for host %s", len(routes), hostStr)
+ outboundRoutes = routes
+ } else {
+ log.Warnf("buildHTTPRoute: ServiceRoute found but no routes built for host %s", hostStr)
+ }
+ } else {
+ log.Debugf("buildHTTPRoute: no ServiceRoute found for host %s, using default route", hostStr)
+ }
+
return &route.RouteConfiguration{
Name: routeName,
VirtualHosts: []*route.VirtualHost{
{
Name: fmt.Sprintf("%s|http|%d", hostStr, parsedPort),
Domains: domains,
- Routes: []*route.Route{
- {
- Match: &route.RouteMatch{
- PathSpecifier: &route.RouteMatch_Prefix{
- Prefix: "/",
- },
- },
- Action: &route.Route_Route{
- Route: &route.RouteAction{
- ClusterSpecifier: &route.RouteAction_Cluster{
- Cluster: routeName, // Use routeName (cluster name)
- },
- },
- },
- },
- },
+ Routes: outboundRoutes,
},
},
}
@@ -126,3 +130,110 @@
},
}
}
+
+func defaultSingleClusterRoute(clusterName string) *route.Route {
+ return &route.Route{
+ Match: &route.RouteMatch{
+ PathSpecifier: &route.RouteMatch_Prefix{
+ Prefix: "/",
+ },
+ },
+ Action: &route.Route_Route{
+ Route: &route.RouteAction{
+ ClusterSpecifier: &route.RouteAction_Cluster{
+ Cluster: clusterName,
+ },
+ },
+ },
+ }
+}
+
+func buildRoutesFromServiceRoute(vs *networking.VirtualService, hostName host.Name, defaultPort int) []*route.Route {
+ if vs == nil || len(vs.Http) == 0 {
+ return nil
+ }
+ var routes []*route.Route
+ for _, httpRoute := range vs.Http {
+ if httpRoute == nil {
+ continue
+ }
+ if built := buildRouteFromHTTPRoute(httpRoute, hostName, defaultPort); built != nil {
+ routes = append(routes, built)
+ }
+ }
+ return routes
+}
+
+func buildRouteFromHTTPRoute(httpRoute *networking.HTTPRoute, hostName host.Name, defaultPort int) *route.Route {
+ if httpRoute == nil || len(httpRoute.Route) == 0 {
+ log.Warnf("buildRouteFromHTTPRoute: httpRoute is nil or has no routes")
+ return nil
+ }
+ log.Infof("buildRouteFromHTTPRoute: processing HTTPRoute with %d route destinations", len(httpRoute.Route))
+ weights := make([]*route.WeightedCluster_ClusterWeight, 0, len(httpRoute.Route))
+ var totalWeight uint32
+ for i, dest := range httpRoute.Route {
+ if dest == nil {
+ log.Warnf("buildRouteFromHTTPRoute: route[%d] is nil", i)
+ continue
+ }
+ destination := dest.Destination
+ if destination == nil {
+ log.Warnf("buildRouteFromHTTPRoute: route[%d] has nil Destination (weight=%d), creating default destination with host=%s",
+ i, dest.Weight, hostName)
+ destination = &networking.Destination{
+ Host: string(hostName),
+ }
+ } else {
+ log.Debugf("buildRouteFromHTTPRoute: route[%d] Destination: host=%s, subset=%s, port=%v, weight=%d",
+ i, destination.Host, destination.Subset, destination.Port, dest.Weight)
+ }
+ targetHost := destination.Host
+ if targetHost == "" {
+ targetHost = string(hostName)
+ }
+ targetPort := defaultPort
+ if destination.Port != nil && destination.Port.Number != 0 {
+ targetPort = int(destination.Port.Number)
+ }
+ subsetName := destination.Subset
+ clusterName := model.BuildSubsetKey(model.TrafficDirectionOutbound, subsetName, host.Name(targetHost), targetPort)
+ weight := dest.Weight
+ if weight <= 0 {
+ weight = 1
+ }
+ totalWeight += uint32(weight)
+ log.Infof("buildRouteFromHTTPRoute: route[%d] -> cluster=%s, subset=%s, weight=%d, host=%s, port=%d",
+ i, clusterName, subsetName, weight, targetHost, targetPort)
+ weights = append(weights, &route.WeightedCluster_ClusterWeight{
+ Name: clusterName,
+ Weight: wrapperspb.UInt32(uint32(weight)),
+ })
+ }
+ if len(weights) == 0 {
+ log.Warnf("buildRouteFromHTTPRoute: no valid weights generated")
+ return nil
+ }
+ weightedClusters := &route.WeightedCluster{
+ Clusters: weights,
+ }
+ if totalWeight > 0 {
+ weightedClusters.TotalWeight = wrapperspb.UInt32(totalWeight)
+ }
+ log.Infof("buildRouteFromHTTPRoute: built WeightedCluster with %d clusters, totalWeight=%d", len(weights), totalWeight)
+
+ return &route.Route{
+ Match: &route.RouteMatch{
+ PathSpecifier: &route.RouteMatch_Prefix{
+ Prefix: "/",
+ },
+ },
+ Action: &route.Route_Route{
+ Route: &route.RouteAction{
+ ClusterSpecifier: &route.RouteAction_WeightedClusters{
+ WeightedClusters: weightedClusters,
+ },
+ },
+ },
+ }
+}
diff --git a/dubbod/planet/pkg/xds/ads.go b/dubbod/planet/pkg/xds/ads.go
index 3ea0321..8a598f7 100644
--- a/dubbod/planet/pkg/xds/ads.go
+++ b/dubbod/planet/pkg/xds/ads.go
@@ -19,12 +19,13 @@
import (
"fmt"
- "github.com/apache/dubbo-kubernetes/pkg/maps"
"strconv"
"strings"
"sync/atomic"
"time"
+ "github.com/apache/dubbo-kubernetes/pkg/maps"
+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
v3 "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/xds/v3"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
@@ -332,15 +333,12 @@
resourceNamesStr = " [wildcard]"
}
+ // Always log at INFO so手工调用 grpcurl 也能看到完整请求轨迹
if shouldRespond {
- // Log NEW requests at INFO level - these are triggered by grpcurl requests
- // This makes it easy to see when a grpcurl request triggers xDS configuration
log.Infof("%s: REQ %s resources:%d nonce:%s%s (will respond)", stype,
con.ID(), len(req.ResourceNames), req.ResponseNonce, resourceNamesStr)
} else {
- // Log ACK/ignored requests at DEBUG level to reduce noise
- // These are normal XDS protocol ACKs, not new requests from grpcurl
- log.Debugf("%s: REQ %s resources:%d nonce:%s%s (ACK/ignored)", stype,
+ log.Infof("%s: REQ %s resources:%d nonce:%s%s (ACK/ignored)", stype,
con.ID(), len(req.ResourceNames), req.ResponseNonce, resourceNamesStr)
}
diff --git a/dubbod/planet/pkg/xds/delta.go b/dubbod/planet/pkg/xds/delta.go
index c75a7b2..18d0260 100644
--- a/dubbod/planet/pkg/xds/delta.go
+++ b/dubbod/planet/pkg/xds/delta.go
@@ -19,10 +19,11 @@
import (
"errors"
- dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
"strings"
"time"
+ dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
+
dubbogrpc "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/grpc"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/model"
v3 "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/xds/v3"
@@ -67,12 +68,16 @@
return status.Errorf(codes.ResourceExhausted, "request rate limit exceeded: %v", err)
}
- // TODO authenticate
+ ids, err := s.authenticate(ctx)
+ if err != nil {
+ return status.Error(codes.Unauthenticated, err.Error())
+ }
s.globalPushContext().InitContext(s.Env, nil, nil)
con := newDeltaConnection(peerAddr, stream)
+ con.s = s
- go s.receiveDelta(con, nil)
+ go s.receiveDelta(con, ids)
<-con.InitializedCh()
@@ -154,6 +159,18 @@
deltaLog.Infof("new delta connection for node:%s", con.ID())
}
+ subscribeStr := " [wildcard]"
+ if len(req.ResourceNamesSubscribe) > 0 {
+ subscribeStr = " [" + strings.Join(req.ResourceNamesSubscribe, ", ") + "]"
+ }
+ unsubscribeStr := ""
+ if len(req.ResourceNamesUnsubscribe) > 0 {
+ unsubscribeStr = " unsubscribe:[" + strings.Join(req.ResourceNamesUnsubscribe, ", ") + "]"
+ }
+ deltaLog.Infof("%s: RAW DELTA REQ %s sub:%d%s nonce:%s%s",
+ v3.GetShortType(req.TypeUrl), con.ID(), len(req.ResourceNamesSubscribe), subscribeStr,
+ req.ResponseNonce, unsubscribeStr)
+
select {
case con.deltaReqChan <- req:
case <-con.deltaStream.Context().Done():
diff --git a/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go b/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
index 9221855..e18b919 100644
--- a/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
+++ b/dubbod/planet/pkg/xds/endpoints/endpoint_builder.go
@@ -273,14 +273,18 @@
}
func (b *EndpointBuilder) matchesSubset(epLabels labels.Instance) bool {
- // TODO: implement subset matching logic based on SubsetRule
- // For now, return true if no subset is specified or if subset is empty
if b.subsetName == "" {
return true
}
- // Simplified subset matching - in real implementation, this should match
- // against SubsetRule subset labels
- return true
+ if b.service == nil || b.push == nil {
+ return true
+ }
+ selector := b.push.SubsetLabelsForHost(b.service.Attributes.Namespace, b.hostname, b.subsetName)
+ if len(selector) == 0 {
+ // No subset labels defined, treat as match-all
+ return true
+ }
+ return selector.SubsetOf(epLabels)
}
func (b *EndpointBuilder) buildLbEndpoint(ep *model.DubboEndpoint) *endpoint.LbEndpoint {
diff --git a/dubbod/planet/pkg/xds/xdsgen.go b/dubbod/planet/pkg/xds/xdsgen.go
index 318ba13..54e7f1a 100644
--- a/dubbod/planet/pkg/xds/xdsgen.go
+++ b/dubbod/planet/pkg/xds/xdsgen.go
@@ -580,6 +580,26 @@
resp.RemovedResources = sets.SortedList(removed)
}
var newResourceNames sets.String
+ if shouldSetWatchedResources(w) {
+ if usedDelta {
+ if w.ResourceNames != nil {
+ newResourceNames = w.ResourceNames.Copy()
+ } else {
+ newResourceNames = sets.New[string]()
+ }
+ for _, removed := range resp.RemovedResources {
+ newResourceNames.Delete(removed)
+ }
+ for _, r := range res {
+ newResourceNames.Insert(r.Name)
+ }
+ } else {
+ newResourceNames = resourceNamesSet(res)
+ }
+ }
+ if neverRemoveDelta(w.TypeUrl) {
+ resp.RemovedResources = nil
+ }
if len(resp.RemovedResources) > 0 {
deltaLog.Infof("%v REMOVE for node:%s %v", v3.GetShortType(w.TypeUrl), con.ID(), resp.RemovedResources)
}
@@ -632,6 +652,36 @@
return g
}
+func resourceNamesSet(res model.Resources) sets.String {
+ names := sets.New[string]()
+ for _, r := range res {
+ if r != nil {
+ names.Insert(r.Name)
+ }
+ }
+ return names
+}
+
+func shouldSetWatchedResources(w *model.WatchedResource) bool {
+ if w == nil {
+ return false
+ }
+ if requiresResourceNamesModification(w.TypeUrl) {
+ return false
+ }
+ return xds.IsWildcardTypeURL(w.TypeUrl)
+}
+
+func requiresResourceNamesModification(typeURL string) bool {
+ return typeURL == v3.AddressType
+}
+
+func neverRemoveDelta(typeURL string) bool {
+ // Align with Envoy bug https://github.com/envoyproxy/envoy/issues/32823
+ // Skip removals for ExtensionConfiguration to avoid flapping.
+ return typeURL == v3.ExtensionConfigurationType
+}
+
// extractRouteNamesFromLDS extracts route names referenced in LDS listener resources
// For outbound listeners with ApiListener, the route name is the RouteConfigName from Rds config
// Route name format is the same as cluster name: "outbound|port||hostname"
diff --git a/manifests/charts/base/files/crd-all.yaml b/manifests/charts/base/files/crd-all.yaml
index 6e55e0c..9f82a97 100644
--- a/manifests/charts/base/files/crd-all.yaml
+++ b/manifests/charts/base/files/crd-all.yaml
@@ -197,7 +197,7 @@
redirect or forward (default) traffic.
items:
properties:
- subset:
+ destination:
description: Subset uniquely identifies the instances
of a service to which the request/connection should
be forwarded to.
@@ -225,7 +225,7 @@
format: int32
type: integer
required:
- - subset
+ - destination
type: object
type: array
type: object
diff --git a/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml b/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml
index 3528c94..2efb6f5 100644
--- a/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml
+++ b/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml
@@ -7,6 +7,9 @@
- apiGroups: [ "security.istio.io", "networking.istio.io" ]
verbs: [ "get", "watch", "list" ]
resources: [ "*" ]
+ - apiGroups: [ "security.dubbo.apache.org", "networking.dubbo.apache.org" ]
+ verbs: [ "get", "watch", "list" ]
+ resources: [ "*" ]
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["mutatingwebhookconfigurations"]
verbs: ["get", "list", "watch", "update", "patch"]
diff --git a/manifests/charts/dubbo-control/dubbo-discovery/templates/validatingwebhookconfiguration.yaml b/manifests/charts/dubbo-control/dubbo-discovery/templates/validatingwebhookconfiguration.yaml
index 18c3d77..34c303b 100644
--- a/manifests/charts/dubbo-control/dubbo-discovery/templates/validatingwebhookconfiguration.yaml
+++ b/manifests/charts/dubbo-control/dubbo-discovery/templates/validatingwebhookconfiguration.yaml
@@ -4,24 +4,37 @@
metadata:
name: dubbo-validator-dubbo-system
namespace: default
+ labels:
+ app: dubbod
+ dubbo.apache.org/rev: {{ .Values.revision | default "default" }}
webhooks:
- - name: validation.dubbo.apache.org
+ - name: rev.validation.dubbo.apache.org
+ admissionReviewVersions: ["v1"]
clientConfig:
service:
name: dubbod
namespace: dubbo-system
path: "/validate"
+ caBundle: ""
+ failurePolicy: Ignore
+ objectSelector:
+ matchExpressions:
+ - key: dubbo.apache.org/rev
+ operator: In
+ values:
+ - default
rules:
- - operations:
- - CREATE
- - UPDATE
- apiGroups:
- - security.istio.io
- - networking.istio.io
- apiVersions:
- - "*"
- resources:
- - "*"
+ - operations:
+ - CREATE
+ - UPDATE
+ apiGroups:
+ - security.istio.io
+ - networking.istio.io
+ - security.dubbo.apache.org
+ - networking.dubbo.apache.org
+ apiVersions:
+ - "*"
+ resources:
+ - "*"
sideEffects: None
- admissionReviewVersions: ["v1"]
diff --git a/pkg/config/schema/collections/collections.agent.go b/pkg/config/schema/collections/collections.agent.go
index b7a6a89..60bb62d 100644
--- a/pkg/config/schema/collections/collections.agent.go
+++ b/pkg/config/schema/collections/collections.agent.go
@@ -49,7 +49,7 @@
Identifier: "SubsetRule",
Group: "networking.dubbo.apache.org",
Kind: "SubsetRule",
- Plural: "destinationrules",
+ Plural: "subsetrules",
Version: "v1",
VersionAliases: []string{},
Proto: "istio.networking.v1alpha3.DestinationRule", StatusProto: "istio.meta.v1alpha1.IstioStatus",
@@ -60,9 +60,9 @@
Builtin: false,
}.MustBuild()
ServiceRoute = collection.Builder{
- Identifier: "serviceRoute",
+ Identifier: "ServiceRoute",
Group: "networking.dubbo.apache.org",
- Kind: "serviceRoute",
+ Kind: "ServiceRoute",
Plural: "serviceroutes",
Version: "v1",
VersionAliases: []string{},
diff --git a/pkg/config/schema/collections/collections.go b/pkg/config/schema/collections/collections.go
index a144c23..49a15f4 100644
--- a/pkg/config/schema/collections/collections.go
+++ b/pkg/config/schema/collections/collections.go
@@ -49,7 +49,7 @@
Identifier: "SubsetRule",
Group: "networking.dubbo.apache.org",
Kind: "SubsetRule",
- Plural: "destinationrules",
+ Plural: "subsetrules",
Version: "v1",
VersionAliases: []string{},
Proto: "istio.networking.v1alpha3.DestinationRule", StatusProto: "istio.meta.v1alpha1.IstioStatus",
@@ -60,9 +60,9 @@
Builtin: false,
}.MustBuild()
ServiceRoute = collection.Builder{
- Identifier: "serviceRoute",
+ Identifier: "ServiceRoute",
Group: "networking.dubbo.apache.org",
- Kind: "serviceRoute",
+ Kind: "ServiceRoute",
Plural: "serviceroutes",
Version: "v1",
VersionAliases: []string{},
diff --git a/pkg/config/schema/gvk/resources.go b/pkg/config/schema/gvk/resources.go
index cf3828c..a55a3f4 100644
--- a/pkg/config/schema/gvk/resources.go
+++ b/pkg/config/schema/gvk/resources.go
@@ -40,7 +40,7 @@
MeshConfig = config.GroupVersionKind{Group: "", Version: "v1alpha1", Kind: "MeshConfig"}
PeerAuthentication = config.GroupVersionKind{Group: "security.dubbo.apache.org", Version: "v1", Kind: "PeerAuthentication"}
SubsetRule = config.GroupVersionKind{Group: "networking.dubbo.apache.org", Version: "v1", Kind: "SubsetRule"}
- ServiceRoute = config.GroupVersionKind{Group: "networking.dubbo.apache.org", Version: "v1", Kind: "serviceRoute"}
+ ServiceRoute = config.GroupVersionKind{Group: "networking.dubbo.apache.org", Version: "v1", Kind: "ServiceRoute"}
EndpointSlice = config.GroupVersionKind{Group: "discovery.k8s.io", Version: "v1", Kind: "EndpointSlice"}
Endpoints = config.GroupVersionKind{Group: "", Version: "v1", Kind: "Endpoints"}
)
diff --git a/pkg/config/schema/kind/resources.go b/pkg/config/schema/kind/resources.go
index 2fdccc3..602ca51 100644
--- a/pkg/config/schema/kind/resources.go
+++ b/pkg/config/schema/kind/resources.go
@@ -68,7 +68,7 @@
case PeerAuthentication:
return "PeerAuthentication"
case ServiceRoute:
- return "serviceRoute"
+ return "ServiceRoute"
case SubsetRule:
return "SubsetRule"
default:
diff --git a/pkg/config/schema/kubeclient/resources.go b/pkg/config/schema/kubeclient/resources.go
index 6cf9708..8e0ca75 100644
--- a/pkg/config/schema/kubeclient/resources.go
+++ b/pkg/config/schema/kubeclient/resources.go
@@ -20,9 +20,11 @@
import (
"context"
"fmt"
+
"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvr"
"github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
ktypes "github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/pkg/util/ptr"
apiistioioapinetworkingv1 "istio.io/client-go/pkg/apis/networking/v1"
apiistioioapisecurityv1 "istio.io/client-go/pkg/apis/security/v1"
@@ -216,25 +218,78 @@
return c.Kube().AdmissionregistrationV1().ValidatingWebhookConfigurations().Watch(context.Background(), options)
}
case gvr.ServiceRoute:
+ // ServiceRoute uses networking.dubbo.apache.org API group, not networking.istio.io
+ // Use Dynamic client to access it
+ gvr := schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "serviceroutes",
+ }
l = func(options metav1.ListOptions) (runtime.Object, error) {
- return c.Dubbo().NetworkingV1().VirtualServices(opts.Namespace).List(context.Background(), options)
+ return c.Dynamic().Resource(gvr).Namespace(opts.Namespace).List(context.Background(), options)
}
w = func(options metav1.ListOptions) (watch.Interface, error) {
- return c.Dubbo().NetworkingV1().VirtualServices(opts.Namespace).Watch(context.Background(), options)
+ return c.Dynamic().Resource(gvr).Namespace(opts.Namespace).Watch(context.Background(), options)
}
case gvr.SubsetRule:
+ // SubsetRule uses networking.dubbo.apache.org API group, not networking.istio.io
+ // Use Dynamic client to access it
+ gvr := schema.GroupVersionResource{
+ Group: "networking.dubbo.apache.org",
+ Version: "v1",
+ Resource: "subsetrules",
+ }
l = func(options metav1.ListOptions) (runtime.Object, error) {
- return c.Dubbo().NetworkingV1().DestinationRules(opts.Namespace).List(context.Background(), options)
+ // Log the namespace being watched for diagnosis
+ if opts.Namespace == "" {
+ log.Infof("SubsetRule informer: List called for all namespaces")
+ } else {
+ log.Infof("SubsetRule informer: List called for namespace %s", opts.Namespace)
+ }
+ return c.Dynamic().Resource(gvr).Namespace(opts.Namespace).List(context.Background(), options)
}
w = func(options metav1.ListOptions) (watch.Interface, error) {
- return c.Dubbo().NetworkingV1().DestinationRules(opts.Namespace).Watch(context.Background(), options)
+ // Log the namespace being watched for diagnosis
+ if opts.Namespace == "" {
+ log.Infof("SubsetRule informer: Watch called for all namespaces")
+ } else {
+ log.Infof("SubsetRule informer: Watch called for namespace %s", opts.Namespace)
+ }
+ watchInterface, err := c.Dynamic().Resource(gvr).Namespace(opts.Namespace).Watch(context.Background(), options)
+ if err != nil {
+ log.Errorf("SubsetRule informer: Watch failed: %v", err)
+ } else {
+ log.Infof("SubsetRule informer: Watch connection established successfully")
+ }
+ return watchInterface, err
}
case gvr.PeerAuthentication:
+ peerAuthGVR := schema.GroupVersionResource{
+ Group: "security.dubbo.apache.org",
+ Version: "v1",
+ Resource: "peerauthentications",
+ }
l = func(options metav1.ListOptions) (runtime.Object, error) {
- return c.Dubbo().SecurityV1().PeerAuthentications(opts.Namespace).List(context.Background(), options)
+ if opts.Namespace == "" {
+ log.Infof("PeerAuthentication informer: List called for all namespaces")
+ } else {
+ log.Infof("PeerAuthentication informer: List called for namespace %s", opts.Namespace)
+ }
+ return c.Dynamic().Resource(peerAuthGVR).Namespace(opts.Namespace).List(context.Background(), options)
}
w = func(options metav1.ListOptions) (watch.Interface, error) {
- return c.Dubbo().SecurityV1().PeerAuthentications(opts.Namespace).Watch(context.Background(), options)
+ if opts.Namespace == "" {
+ log.Infof("PeerAuthentication informer: Watch called for all namespaces")
+ } else {
+ log.Infof("PeerAuthentication informer: Watch called for namespace %s", opts.Namespace)
+ }
+ watchInterface, err := c.Dynamic().Resource(peerAuthGVR).Namespace(opts.Namespace).Watch(context.Background(), options)
+ if err != nil {
+ log.Errorf("PeerAuthentication informer: Watch failed: %v", err)
+ } else {
+ log.Infof("PeerAuthentication informer: Watch connection established successfully")
+ }
+ return watchInterface, err
}
case gvr.Pod:
l = func(options metav1.ListOptions) (runtime.Object, error) {
diff --git a/pkg/kube/kclient/client.go b/pkg/kube/kclient/client.go
index eb65ff2..5036302 100644
--- a/pkg/kube/kclient/client.go
+++ b/pkg/kube/kclient/client.go
@@ -20,6 +20,7 @@
import (
"context"
"fmt"
+
"github.com/apache/dubbo-kubernetes/pkg/util/ptr"
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/features"
@@ -39,7 +40,6 @@
"k8s.io/client-go/tools/cache"
"sync"
- "sync/atomic"
dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
)
@@ -123,36 +123,6 @@
return newDelayedInformer[T](gvr, inf, delay, filter)
}
-func newDelayedInformer[T controllers.ComparableObject](gvr schema.GroupVersionResource, getInf func() informerfactory.StartableInformer, delay kubetypes.DelayedFilter, filter Filter) Informer[T] {
- delayedClient := &delayedClient[T]{
- inf: new(atomic.Pointer[Informer[T]]),
- delayed: delay,
- }
-
- // If resource is not yet known, we will use the delayedClient.
- // When the resource is later loaded, the callback will trigger and swap our dummy delayedClient
- // with a full client
- readyNow := delay.KnownOrCallback(func(stop <-chan struct{}) {
- // The inf() call is responsible for starting the informer
- inf := getInf()
- fc := &informerClient[T]{
- informer: inf.Informer,
- startInformer: inf.Start,
- }
- applyDynamicFilter(filter, gvr, fc)
- inf.Start(stop)
- log.Infof("%v is now ready, building client", gvr.GroupResource())
- // Swap out the dummy client with the full one
- delayedClient.set(fc)
- })
- if !readyNow {
- log.Debugf("%v is not ready now, building delayed client", gvr.GroupResource())
- return delayedClient
- }
- log.Debugf("%v ready now, building client", gvr.GroupResource())
- return newInformerClient[T](gvr, getInf(), filter)
-}
-
func NewFiltered[T controllers.ComparableObject](c kube.Client, filter Filter) Client[T] {
gvr := types.MustToGVR[T](types.MustGVKFromType[T]())
inf := kubeclient.GetInformerFiltered[T](c, ToOpts(c, gvr, filter), gvr)
@@ -179,25 +149,35 @@
filter.ObjectFilter.AddHandler(func(added, removed sets.String) {
ic.handlerMu.RLock()
defer ic.handlerMu.RUnlock()
+ log.Infof("applyDynamicFilter: namespace filter handler triggered for %v: added=%v, removed=%v", gvr, added, removed)
if gvr == dubbogvr.Namespace {
for _, item := range ic.ListUnfiltered(metav1.NamespaceAll, klabels.Everything()) {
if !added.Contains(item.GetName()) {
continue
}
+ log.Infof("applyDynamicFilter: triggering OnAdd for namespace %s", item.GetName())
for _, c := range ic.registeredHandlers {
c.handler.OnAdd(item, false)
}
}
} else {
for ns := range added {
- for _, item := range ic.ListUnfiltered(ns, klabels.Everything()) {
+ log.Infof("applyDynamicFilter: namespace %s added, listing unfiltered objects for %v", ns, gvr)
+ items := ic.ListUnfiltered(ns, klabels.Everything())
+ log.Infof("applyDynamicFilter: found %d unfiltered objects in namespace %s for %v", len(items), ns, gvr)
+ for _, item := range items {
+ log.Infof("applyDynamicFilter: triggering OnAdd for %s/%s in namespace %s", item.GetNamespace(), item.GetName(), ns)
for _, c := range ic.registeredHandlers {
c.handler.OnAdd(item, false)
}
}
}
for ns := range removed {
- for _, item := range ic.ListUnfiltered(ns, klabels.Everything()) {
+ log.Infof("applyDynamicFilter: namespace %s removed, listing unfiltered objects for %v", ns, gvr)
+ items := ic.ListUnfiltered(ns, klabels.Everything())
+ log.Infof("applyDynamicFilter: found %d unfiltered objects in namespace %s for %v", len(items), ns, gvr)
+ for _, item := range items {
+ log.Infof("applyDynamicFilter: triggering OnDelete for %s/%s in namespace %s", item.GetNamespace(), item.GetName(), ns)
for _, c := range ic.registeredHandlers {
c.handler.OnDelete(item)
}
@@ -210,15 +190,32 @@
func (n *informerClient[T]) List(namespace string, selector klabels.Selector) []T {
var res []T
+ var filteredCount int
+ var totalCount int
err := cache.ListAllByNamespace(n.informer.GetIndexer(), namespace, selector, func(i any) {
+ totalCount++
cast := i.(T)
if n.applyFilter(cast) {
res = append(res, cast)
+ } else {
+ filteredCount++
+ // Log filtered objects to help diagnose
+ if objWithNs, ok := any(cast).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ log.Debugf("informerClient.List: filtered out object %s/%s for namespace=%s", objWithNs.GetNamespace(), objWithNs.GetName(), namespace)
+ }
}
})
if err != nil {
- fmt.Printf("lister returned err for %v: %v", namespace, err)
+ log.Warnf("informerClient.List: lister returned err for namespace=%s: %v", namespace, err)
+ }
+ if namespace == metav1.NamespaceAll {
+ log.Infof("informerClient.List: namespace=%s, total=%d, filtered=%d, result=%d", namespace, totalCount, filteredCount, len(res))
+ } else if filteredCount > 0 {
+ log.Debugf("informerClient.List: filtered out %d items for namespace=%s (total=%d, result=%d)", filteredCount, namespace, totalCount, len(res))
}
return res
}
@@ -231,7 +228,20 @@
})
if err != nil {
- fmt.Printf("lister returned err for %v: %v", namespace, err)
+ log.Warnf("informerClient.ListUnfiltered: lister returned err for namespace=%s: %v", namespace, err)
+ }
+ if namespace == metav1.NamespaceAll {
+ log.Infof("informerClient.ListUnfiltered: found %d unfiltered objects for namespace=%s (synced=%v)", len(res), namespace, n.informer.HasSynced())
+ if len(res) > 0 {
+ for i, obj := range res {
+ if objWithNs, ok := any(obj).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ log.Infof("informerClient.ListUnfiltered: object[%d] %s/%s", i, objWithNs.GetNamespace(), objWithNs.GetName())
+ }
+ }
+ }
}
return res
}
@@ -285,12 +295,66 @@
func (n *informerClient[T]) AddEventHandler(h cache.ResourceEventHandler) cache.ResourceEventHandlerRegistration {
fh := cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
+ var nameStr, nsStr string
+ if objWithNs, ok := any(obj).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ nsStr = objWithNs.GetNamespace()
+ nameStr = objWithNs.GetName()
+ }
if n.filter == nil {
+ log.Debugf("informerClient.AddEventHandler: FilterFunc allowing object %s/%s (no filter)", nsStr, nameStr)
return true
}
- return n.filter(obj)
+ cast := obj.(T)
+ allowed := n.filter(cast)
+ if !allowed {
+ // Log when objects are filtered out to help diagnose missing events
+ log.Infof("informerClient.AddEventHandler: FilterFunc filtered out object %s/%s", nsStr, nameStr)
+ } else {
+ log.Debugf("informerClient.AddEventHandler: FilterFunc allowing object %s/%s", nsStr, nameStr)
+ }
+ return allowed
},
- Handler: h,
+ Handler: cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ var nameStr, nsStr string
+ if objWithNs, ok := any(obj).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ nsStr = objWithNs.GetNamespace()
+ nameStr = objWithNs.GetName()
+ }
+ log.Infof("informerClient.AddEventHandler: OnAdd called for %s/%s", nsStr, nameStr)
+ h.OnAdd(obj, false)
+ },
+ UpdateFunc: func(oldObj, newObj interface{}) {
+ var nameStr, nsStr string
+ if objWithNs, ok := any(newObj).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ nsStr = objWithNs.GetNamespace()
+ nameStr = objWithNs.GetName()
+ }
+ log.Infof("informerClient.AddEventHandler: OnUpdate called for %s/%s", nsStr, nameStr)
+ h.OnUpdate(oldObj, newObj)
+ },
+ DeleteFunc: func(obj interface{}) {
+ var nameStr, nsStr string
+ if objWithNs, ok := any(obj).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ nsStr = objWithNs.GetNamespace()
+ nameStr = objWithNs.GetName()
+ }
+ log.Infof("informerClient.AddEventHandler: OnDelete called for %s/%s", nsStr, nameStr)
+ h.OnDelete(obj)
+ },
+ },
}
n.handlerMu.Lock()
defer n.handlerMu.Unlock()
diff --git a/pkg/kube/kclient/delayed.go b/pkg/kube/kclient/delayed.go
index 4c7b35e..5278783 100644
--- a/pkg/kube/kclient/delayed.go
+++ b/pkg/kube/kclient/delayed.go
@@ -18,11 +18,13 @@
package kclient
import (
- "github.com/apache/dubbo-kubernetes/pkg/util/ptr"
"sync"
"sync/atomic"
+ "github.com/apache/dubbo-kubernetes/pkg/util/ptr"
+
"github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
"github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
"github.com/apache/dubbo-kubernetes/pkg/slices"
klabels "k8s.io/apimachinery/pkg/labels"
@@ -89,6 +91,8 @@
s.hm.Lock()
defer s.hm.Unlock()
for _, h := range s.handlers {
+ // h is a delayedHandler which embeds ResourceEventHandler, so we can pass it directly
+ // This matches Istio's implementation
reg := inf.AddEventHandler(h)
h.hasSynced.hasSynced.Store(ptr.Of(reg.HasSynced))
}
@@ -210,3 +214,43 @@
// Not ready yet, return nil
return nil
}
+
+func newDelayedInformer[T controllers.ComparableObject](
+ gvr schema.GroupVersionResource,
+ getInf func() informerfactory.StartableInformer,
+ delay kubetypes.DelayedFilter,
+ filter Filter,
+) Informer[T] {
+ delayedClient := &delayedClient[T]{
+ inf: new(atomic.Pointer[Informer[T]]),
+ delayed: delay,
+ }
+
+ // If resource is not yet known, we will use the delayedClient.
+ // When the resource is later loaded, the callback will trigger and swap our dummy delayedClient
+ // with a full client
+ readyNow := delay.KnownOrCallback(func(stop <-chan struct{}) {
+ // The inf() call is responsible for starting the informer
+ inf := getInf()
+ fc := &informerClient[T]{
+ informer: inf.Informer,
+ startInformer: inf.Start,
+ }
+ applyDynamicFilter(filter, gvr, fc)
+ // Swap out the dummy client with the full one BEFORE starting the informer
+ // This ensures handlers are registered before the informer starts syncing
+ delayedClient.set(fc)
+ inf.Start(stop)
+ log.Infof("%v is now ready, building client", gvr.GroupResource())
+ })
+ if !readyNow {
+ log.Debugf("%v is not ready now, building delayed client", gvr.GroupResource())
+ return delayedClient
+ }
+ log.Debugf("%v ready now, building client", gvr.GroupResource())
+ // When readyNow is true, we need to ensure the informer is registered with the factory
+ // so InformerFactory.Start() can pick it up.
+ inf := getInf()
+ fc := newInformerClient[T](gvr, inf, filter)
+ return fc
+}
diff --git a/pkg/kube/krt/informer.go b/pkg/kube/krt/informer.go
index 0ddefe3..120ebae 100644
--- a/pkg/kube/krt/informer.go
+++ b/pkg/kube/krt/informer.go
@@ -19,6 +19,7 @@
import (
"fmt"
+
"github.com/apache/dubbo-kubernetes/pkg/util/ptr"
"github.com/apache/dubbo-kubernetes/pkg/kube"
@@ -155,7 +156,44 @@
func (i *informer[I]) RegisterBatch(f func(o []Event[I]), runExistingState bool) HandlerRegistration {
synced := i.inf.AddEventHandler(informerEventHandler[I](func(o Event[I], initialSync bool) {
- f([]Event[I]{o})
+ // Only process events if runExistingState is true OR this is not an initial sync event
+ // This matches Istio's behavior: runExistingState=false means skip initial sync events
+ if runExistingState || !initialSync {
+ // Log all events to help diagnose missing events
+ var nameStr, nsStr string
+ if o.New != nil {
+ if objWithNs, ok := any(*o.New).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ nsStr = objWithNs.GetNamespace()
+ nameStr = objWithNs.GetName()
+ }
+ } else if o.Old != nil {
+ if objWithNs, ok := any(*o.Old).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ nsStr = objWithNs.GetNamespace()
+ nameStr = objWithNs.GetName()
+ }
+ }
+ log.Debugf("informer.RegisterBatch: processing event %s for %s/%s (initialSync=%v, runExistingState=%v)", o.Event, nsStr, nameStr, initialSync, runExistingState)
+ f([]Event[I]{o})
+ } else {
+ // Log skipped events to help diagnose
+ var nameStr, nsStr string
+ if o.New != nil {
+ if objWithNs, ok := any(*o.New).(interface {
+ GetNamespace() string
+ GetName() string
+ }); ok {
+ nsStr = objWithNs.GetNamespace()
+ nameStr = objWithNs.GetName()
+ }
+ }
+ log.Debugf("informer.RegisterBatch: skipping initial sync event for %s/%s (initialSync=%v, runExistingState=%v)", nsStr, nameStr, initialSync, runExistingState)
+ }
}))
base := i.baseSyncer
handler := pollSyncer{
diff --git a/pkg/kube/namespace/filter.go b/pkg/kube/namespace/filter.go
index b0f60c7..0e59b30 100644
--- a/pkg/kube/namespace/filter.go
+++ b/pkg/kube/namespace/filter.go
@@ -107,10 +107,12 @@
// When an object is deleted, obj could be a DeletionFinalStateUnknown marker item.
ns, ok := extractObjectNamespace(obj)
if !ok {
+ log.Debugf("discoveryNamespacesFilter.Filter: failed to extract namespace from object, rejecting")
return false
}
if ns == "" {
// Cluster scoped resources. Always included
+ log.Debugf("discoveryNamespacesFilter.Filter: cluster-scoped resource, allowing")
return true
}
@@ -118,11 +120,18 @@
defer d.lock.RUnlock()
// permit all objects if discovery selectors are not specified
if len(d.discoverySelectors) == 0 {
+ log.Debugf("discoveryNamespacesFilter.Filter: no discovery selectors, allowing namespace %s", ns)
return true
}
// permit if object resides in a namespace labeled for discovery
- return d.discoveryNamespaces.Contains(ns)
+ allowed := d.discoveryNamespaces.Contains(ns)
+ if !allowed {
+ log.Infof("discoveryNamespacesFilter.Filter: namespace %s not in discoveryNamespaces (selectors=%d, namespaces=%v), rejecting", ns, len(d.discoverySelectors), d.discoveryNamespaces)
+ } else {
+ log.Debugf("discoveryNamespacesFilter.Filter: namespace %s in discoveryNamespaces, allowing", ns)
+ }
+ return allowed
}
// AddHandler registers a handler on namespace, which will be triggered when namespace selected or deselected.
diff --git a/pkg/xds/server.go b/pkg/xds/server.go
index 5a2de13..bf5e126 100644
--- a/pkg/xds/server.go
+++ b/pkg/xds/server.go
@@ -192,6 +192,13 @@
// Connection logged in initConnection() after addCon() to ensure accurate counting
}
+ resourceNamesStr := " [wildcard]"
+ if len(req.ResourceNames) > 0 {
+ resourceNamesStr = " [" + strings.Join(req.ResourceNames, ", ") + "]"
+ }
+ log.Infof("%s: RAW REQ %s resources:%d nonce:%s%s",
+ model.GetShortType(req.TypeUrl), con.conID, len(req.ResourceNames), req.ResponseNonce, resourceNamesStr)
+
select {
case con.reqChan <- req:
case <-con.stream.Context().Done():
@@ -316,6 +323,32 @@
// A nonce becomes stale following a newer nonce being sent to Envoy.
// previousInfo.NonceSent can be empty if we previously had shouldRespond=true but didn't send any resources.
if request.ResponseNonce != previousInfo.NonceSent {
+ newResources := sets.New(request.ResourceNames...)
+ // Special-case proxyless gRPC: Envoy will send a "stale" nonce when it changes
+ // subscriptions (e.g., after ServiceRoute introduces subset clusters). Treat this
+ // as a resource change rather than an ACK so the new clusters get a response.
+ previousResourcesCopy := previousInfo.ResourceNames.Copy()
+ if !newResources.Equals(previousResourcesCopy) && len(newResources) > 0 {
+ log.Infof("%s: REQ %s nonce mismatch (got %s, sent %s) but resources changed -> responding",
+ stype, id, request.ResponseNonce, previousInfo.NonceSent)
+ added := newResources.Difference(previousResourcesCopy)
+ w.UpdateWatchedResource(request.TypeUrl, func(wr *WatchedResource) *WatchedResource {
+ if wr == nil {
+ return nil
+ }
+ wr.LastError = ""
+ wr.ResourceNames = newResources
+ // keep previous nonce so the subsequent ACK can match
+ return wr
+ })
+ if len(added) == 0 {
+ // Still respond to make sure client receives an update even if map difference logic
+ // thinks nothing was added (e.g., only removal happened).
+ return true, ResourceDelta{Subscribed: added}
+ }
+ return true, ResourceDelta{Subscribed: added}
+ }
+
// Expired/stale nonce - don't respond, just log at debug level
if previousInfo.NonceSent == "" {
// We never sent a nonce, but client sent one - this is unusual but treat as expired
diff --git a/samples/grpc-app/grpc-app.yaml b/samples/grpc-app/grpc-app.yaml
index 506b640..cb3bef5 100644
--- a/samples/grpc-app/grpc-app.yaml
+++ b/samples/grpc-app/grpc-app.yaml
@@ -32,13 +32,14 @@
apiVersion: apps/v1
kind: Deployment
metadata:
- name: consumer
+ name: consumer-v1
namespace: grpc-app
spec:
replicas: 2
selector:
matchLabels:
app: consumer
+ version: v1
template:
metadata:
annotations:
@@ -47,6 +48,7 @@
proxy.dubbo.apache.org/config: '{"holdApplicationUntilProxyStarts": true}'
labels:
app: consumer
+ version: v1
spec:
containers:
- name: app
@@ -62,6 +64,78 @@
fieldRef:
apiVersion: v1
fieldPath: status.podIP
+ - name: SERVICE_VERSION
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.labels['version']
+ - name: SERVICE_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: SERVICE_PORT
+ value: "17070"
+ readinessProbe:
+ tcpSocket:
+ port: 17070
+ initialDelaySeconds: 5
+ periodSeconds: 5
+ timeoutSeconds: 2
+ successThreshold: 1
+ failureThreshold: 3
+ livenessProbe:
+ tcpSocket:
+ port: 17070
+ initialDelaySeconds: 10
+ periodSeconds: 10
+ timeoutSeconds: 2
+ successThreshold: 1
+ failureThreshold: 3
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: consumer-v2
+ namespace: grpc-app
+spec:
+ replicas: 2
+ selector:
+ matchLabels:
+ app: consumer
+ version: v2
+ template:
+ metadata:
+ annotations:
+ proxyless.dubbo.apache.org/inject: "true"
+ inject.dubbo.apache.org/templates: grpc-agent
+ proxy.dubbo.apache.org/config: '{"holdApplicationUntilProxyStarts": true}'
+ labels:
+ app: consumer
+ version: v2
+ spec:
+ containers:
+ - name: app
+ image: mfordjody/grpc-consumer:dev-debug
+ imagePullPolicy: Always
+ ports:
+ - containerPort: 17070
+ protocol: TCP
+ name: grpc
+ env:
+ - name: INSTANCE_IP
+ valueFrom:
+ fieldRef:
+ apiVersion: v1
+ fieldPath: status.podIP
+ - name: SERVICE_VERSION
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.labels['version']
+ - name: SERVICE_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: SERVICE_PORT
+ value: "17070"
readinessProbe:
tcpSocket:
port: 17070
diff --git a/tests/grpc-app/consumer/main.go b/tests/grpc-app/consumer/main.go
index 8e850ed..89da67e 100644
--- a/tests/grpc-app/consumer/main.go
+++ b/tests/grpc-app/consumer/main.go
@@ -26,6 +26,7 @@
"os"
"os/signal"
"regexp"
+ "strconv"
"strings"
"syscall"
"time"
@@ -47,7 +48,12 @@
type echoServer struct {
pb.UnimplementedEchoServiceServer
pb.UnimplementedEchoTestServiceServer
- hostname string
+ hostname string
+ serviceVersion string
+ namespace string
+ instanceIP string
+ cluster string
+ servicePort int
}
func (s *echoServer) Echo(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
@@ -56,8 +62,13 @@
}
log.Printf("Received: %v", req.Message)
return &pb.EchoResponse{
- Message: req.Message,
- Hostname: s.hostname,
+ Message: req.Message,
+ Hostname: s.hostname,
+ ServiceVersion: s.serviceVersion,
+ Namespace: s.namespace,
+ Ip: s.instanceIP,
+ Cluster: s.cluster,
+ ServicePort: int32(s.servicePort),
}, nil
}
@@ -98,7 +109,14 @@
output := make([]string, 0, count)
for i := int32(0); i < count; i++ {
- line := fmt.Sprintf("[%d body] Hostname=%s", i, s.hostname)
+ line := fmt.Sprintf("[%d body] Hostname=%s ServiceVersion=%s ServicePort=%d Namespace=%s",
+ i, s.hostname, s.serviceVersion, s.servicePort, s.namespace)
+ if s.instanceIP != "" {
+ line += fmt.Sprintf(" IP=%s", s.instanceIP)
+ }
+ if s.cluster != "" {
+ line += fmt.Sprintf(" Cluster=%s", s.cluster)
+ }
output = append(output, line)
}
@@ -274,6 +292,15 @@
}
}
+func firstNonEmpty(values ...string) string {
+ for _, v := range values {
+ if strings.TrimSpace(v) != "" {
+ return v
+ }
+ }
+ return ""
+}
+
func main() {
flag.Parse()
@@ -288,6 +315,24 @@
hostname = "unknown"
}
+ namespace := firstNonEmpty(os.Getenv("SERVICE_NAMESPACE"), os.Getenv("POD_NAMESPACE"), "default")
+ serviceVersion := firstNonEmpty(
+ os.Getenv("SERVICE_VERSION"),
+ os.Getenv("POD_VERSION"),
+ os.Getenv("VERSION"),
+ )
+ if serviceVersion == "" {
+ serviceVersion = "unknown"
+ }
+ cluster := os.Getenv("SERVICE_CLUSTER")
+ instanceIP := os.Getenv("INSTANCE_IP")
+ servicePort := *port
+ if sp := os.Getenv("SERVICE_PORT"); sp != "" {
+ if parsed, err := strconv.Atoi(sp); err == nil {
+ servicePort = parsed
+ }
+ }
+
// Get bootstrap file path from environment variable or use default
bootstrapPath := os.Getenv("GRPC_XDS_BOOTSTRAP")
if bootstrapPath == "" {
@@ -315,7 +360,14 @@
log.Fatalf("Failed to create xDS gRPC server: %v", err)
}
- es := &echoServer{hostname: hostname}
+ es := &echoServer{
+ hostname: hostname,
+ serviceVersion: serviceVersion,
+ namespace: namespace,
+ instanceIP: instanceIP,
+ cluster: cluster,
+ servicePort: servicePort,
+ }
pb.RegisterEchoServiceServer(server, es)
pb.RegisterEchoTestServiceServer(server, es)
// Enable reflection API for grpcurl to discover services
diff --git a/tests/grpc-app/producer/main.go b/tests/grpc-app/producer/main.go
index 9916f44..8bb2319 100644
--- a/tests/grpc-app/producer/main.go
+++ b/tests/grpc-app/producer/main.go
@@ -525,8 +525,29 @@
continue
}
- log.Printf("ForwardEcho: request %d succeeded: Hostname=%s", i+1, resp.Hostname)
- output = append(output, fmt.Sprintf("[%d body] Hostname=%s", i, resp.Hostname))
+ log.Printf("ForwardEcho: request %d succeeded: Hostname=%s ServiceVersion=%s Namespace=%s IP=%s",
+ i+1, resp.Hostname, resp.ServiceVersion, resp.Namespace, resp.Ip)
+
+ lineParts := []string{
+ fmt.Sprintf("[%d body] Hostname=%s", i, resp.Hostname),
+ }
+ if resp.ServiceVersion != "" {
+ lineParts = append(lineParts, fmt.Sprintf("ServiceVersion=%s", resp.ServiceVersion))
+ }
+ if resp.Namespace != "" {
+ lineParts = append(lineParts, fmt.Sprintf("Namespace=%s", resp.Namespace))
+ }
+ if resp.Ip != "" {
+ lineParts = append(lineParts, fmt.Sprintf("IP=%s", resp.Ip))
+ }
+ if resp.Cluster != "" {
+ lineParts = append(lineParts, fmt.Sprintf("Cluster=%s", resp.Cluster))
+ }
+ if resp.ServicePort > 0 {
+ lineParts = append(lineParts, fmt.Sprintf("ServicePort=%d", resp.ServicePort))
+ }
+
+ output = append(output, strings.Join(lineParts, " "))
// Small delay between successful requests to avoid overwhelming the server
if i < count-1 {
diff --git a/tests/grpc-app/proto/echo.proto b/tests/grpc-app/proto/echo.proto
index c0bd61e..1d7c32e 100644
--- a/tests/grpc-app/proto/echo.proto
+++ b/tests/grpc-app/proto/echo.proto
@@ -41,6 +41,11 @@
message EchoResponse {
string message = 1;
string hostname = 2;
+ string service_version = 3;
+ string namespace = 4;
+ string ip = 5;
+ string cluster = 6;
+ int32 service_port = 7;
}
message ForwardEchoRequest {