blob: c8175b9702a7b3ff4f4adb6fdf432507deff3eb7 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"github.com/hashicorp/go-multierror"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube/controller/filter"
"github.com/apache/dubbo-go-pixiu/pkg/config/mesh"
kubelib "github.com/apache/dubbo-go-pixiu/pkg/kube"
)
// initialize handlers for discovery selection scoping
func (c *Controller) initDiscoveryHandlers(
kubeClient kubelib.Client,
endpointMode EndpointMode,
meshWatcher mesh.Watcher,
discoveryNamespacesFilter filter.DiscoveryNamespacesFilter,
) {
c.initDiscoveryNamespaceHandlers(kubeClient, endpointMode, discoveryNamespacesFilter)
c.initMeshWatcherHandler(kubeClient, endpointMode, meshWatcher, discoveryNamespacesFilter)
}
// handle discovery namespace membership changes triggered by namespace events,
// which requires triggering create/delete event handlers for services, pods, and endpoints,
// and updating the DiscoveryNamespacesFilter.
func (c *Controller) initDiscoveryNamespaceHandlers(
kubeClient kubelib.Client,
endpointMode EndpointMode,
discoveryNamespacesFilter filter.DiscoveryNamespacesFilter,
) {
otype := "Namespaces"
c.nsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
incrementEvent(otype, "add")
ns := obj.(*v1.Namespace)
if discoveryNamespacesFilter.NamespaceCreated(ns.ObjectMeta) {
c.queue.Push(func() error {
c.handleSelectedNamespace(endpointMode, ns.Name)
return nil
})
}
},
UpdateFunc: func(old, new interface{}) {
incrementEvent(otype, "update")
oldNs := old.(*v1.Namespace)
newNs := new.(*v1.Namespace)
membershipChanged, namespaceAdded := discoveryNamespacesFilter.NamespaceUpdated(oldNs.ObjectMeta, newNs.ObjectMeta)
if membershipChanged {
var handleFunc func() error
if namespaceAdded {
handleFunc = func() error {
c.handleSelectedNamespace(endpointMode, newNs.Name)
return nil
}
} else {
handleFunc = func() error {
c.handleDeselectedNamespace(kubeClient, endpointMode, newNs.Name)
return nil
}
}
c.queue.Push(handleFunc)
}
},
DeleteFunc: func(obj interface{}) {
incrementEvent(otype, "delete")
ns, ok := obj.(*v1.Namespace)
if !ok {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
if cast, ok := tombstone.Obj.(*v1.Namespace); ok {
ns = cast
} else {
log.Errorf("Failed to convert to tombstoned namespace object: %v", obj)
return
}
} else {
log.Errorf("Failed to convert to namespace object: %v", obj)
return
}
}
discoveryNamespacesFilter.NamespaceDeleted(ns.ObjectMeta)
// no need to invoke object handlers since objects within the namespace will trigger delete events
},
})
}
// handle discovery namespace membership changes triggered by changes to meshConfig's discovery selectors
// which requires updating the DiscoveryNamespaceFilter and triggering create/delete event handlers for services/pods/endpoints
// for membership changes
func (c *Controller) initMeshWatcherHandler(
kubeClient kubelib.Client,
endpointMode EndpointMode,
meshWatcher mesh.Watcher,
discoveryNamespacesFilter filter.DiscoveryNamespacesFilter,
) {
meshWatcher.AddMeshHandler(func() {
newSelectedNamespaces, deselectedNamespaces := discoveryNamespacesFilter.SelectorsChanged(meshWatcher.Mesh().GetDiscoverySelectors())
for _, nsName := range newSelectedNamespaces {
nsName := nsName // need to shadow variable to ensure correct value when evaluated inside the closure below
c.queue.Push(func() error {
c.handleSelectedNamespace(endpointMode, nsName)
return nil
})
}
for _, nsName := range deselectedNamespaces {
nsName := nsName // need to shadow variable to ensure correct value when evaluated inside the closure below
c.queue.Push(func() error {
c.handleDeselectedNamespace(kubeClient, endpointMode, nsName)
return nil
})
}
})
}
// issue create events for all services, pods, and endpoints in the newly labeled namespace
func (c *Controller) handleSelectedNamespace(endpointMode EndpointMode, ns string) {
var errs *multierror.Error
// for each resource type, issue create events for objects in the labeled namespace
services, err := c.serviceLister.Services(ns).List(labels.Everything())
if err != nil {
log.Errorf("error listing services: %v", err)
return
}
for _, svc := range services {
errs = multierror.Append(errs, c.onServiceEvent(svc, model.EventAdd))
}
pods, err := listerv1.NewPodLister(c.pods.informer.GetIndexer()).Pods(ns).List(labels.Everything())
if err != nil {
log.Errorf("error listing pods: %v", err)
return
}
for _, pod := range pods {
errs = multierror.Append(errs, c.pods.onEvent(pod, model.EventAdd))
}
switch endpointMode {
case EndpointsOnly:
endpoints, err := listerv1.NewEndpointsLister(c.endpoints.getInformer().GetIndexer()).Endpoints(ns).List(labels.Everything())
if err != nil {
log.Errorf("error listing endpoints: %v", err)
return
}
for _, ep := range endpoints {
errs = multierror.Append(errs, c.endpoints.onEvent(ep, model.EventAdd))
}
case EndpointSliceOnly:
endpointSlices, err := c.endpoints.(*endpointSliceController).listSlices(ns, labels.Everything())
if err != nil {
log.Errorf("error listing endpoint slices: %v", err)
return
}
for _, ep := range endpointSlices {
errs = multierror.Append(errs, c.endpoints.onEvent(ep, model.EventAdd))
}
}
if err := multierror.Flatten(errs.ErrorOrNil()); err != nil {
log.Errorf("one or more errors while handling newly labeled discovery namespace %s: %v", ns, err)
}
}
// issue delete events for all services, pods, and endpoints in the delabled namespace
// use kubeClient.KubeInformer() to bypass filter in order to list resources from non-labeled namespace,
// which fetches informers from the SharedInformerFactory cache (i.e. does not instantiate a new informer)
func (c *Controller) handleDeselectedNamespace(kubeClient kubelib.Client, endpointMode EndpointMode, ns string) {
var errs *multierror.Error
// for each resource type, issue delete events for objects in the delabled namespace
services, err := kubeClient.KubeInformer().Core().V1().Services().Lister().Services(ns).List(labels.Everything())
if err != nil {
log.Errorf("error listing services: %v", err)
return
}
for _, svc := range services {
errs = multierror.Append(errs, c.onServiceEvent(svc, model.EventDelete))
}
pods, err := kubeClient.KubeInformer().Core().V1().Pods().Lister().Pods(ns).List(labels.Everything())
if err != nil {
log.Errorf("error listing pods: %v", err)
return
}
for _, pod := range pods {
errs = multierror.Append(errs, c.pods.onEvent(pod, model.EventDelete))
}
switch endpointMode {
case EndpointsOnly:
endpoints, err := kubeClient.KubeInformer().Core().V1().Endpoints().Lister().Endpoints(ns).List(labels.Everything())
if err != nil {
log.Errorf("error listing endpoints: %v", err)
return
}
for _, ep := range endpoints {
errs = multierror.Append(errs, c.endpoints.onEvent(ep, model.EventDelete))
}
case EndpointSliceOnly:
endpointSlices, err := c.endpoints.(*endpointSliceController).listSlices(ns, labels.Everything())
if err != nil {
log.Errorf("error listing endpoint slices: %v", err)
return
}
for _, ep := range endpointSlices {
errs = multierror.Append(errs, c.endpoints.onEvent(ep, model.EventDelete))
}
}
if err := multierror.Flatten(errs.ErrorOrNil()); err != nil {
log.Errorf("one or more errors while handling delabeled discovery namespace %s: %v", ns, err)
}
}