blob: d78486230d347b50e67c63f34a6996be37a3a96f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package events
import (
"context"
"fmt"
)
import (
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
)
import (
"github.com/apache/dubbo-kubernetes/pkg/core"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
core_registry "github.com/apache/dubbo-kubernetes/pkg/core/resources/registry"
"github.com/apache/dubbo-kubernetes/pkg/core/runtime/component"
"github.com/apache/dubbo-kubernetes/pkg/events"
dubbo_v1alpha1 "github.com/apache/dubbo-kubernetes/pkg/plugins/resources/k8s/native/api/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/plugins/resources/k8s/native/pkg/model"
k8s_registry "github.com/apache/dubbo-kubernetes/pkg/plugins/resources/k8s/native/pkg/registry"
)
var log = core.Log.WithName("k8s-event-listener")
type listener struct {
mgr manager.Manager
out events.Emitter
}
func NewListener(mgr manager.Manager, out events.Emitter) component.Component {
return &listener{
mgr: mgr,
out: out,
}
}
func (k *listener) Start(stop <-chan struct{}) error {
types := core_registry.Global().ObjectTypes()
knownTypes := k.mgr.GetScheme().KnownTypes(dubbo_v1alpha1.GroupVersion)
for _, t := range types {
if _, ok := knownTypes[string(t)]; !ok {
continue
}
gvk := dubbo_v1alpha1.GroupVersion.WithKind(string(t))
lw, err := k.createListerWatcher(gvk)
if err != nil {
return err
}
coreObj, err := core_registry.Global().NewObject(t)
if err != nil {
return err
}
obj, err := k8s_registry.Global().NewObject(coreObj.GetSpec())
if err != nil {
return err
}
informer := cache.NewSharedInformer(lw, obj, 0)
if _, err := informer.AddEventHandler(k); err != nil {
return err
}
go func(typ core_model.ResourceType) {
log.V(1).Info("start watching resource", "type", typ)
informer.Run(stop)
}(t)
}
return nil
}
func resourceKey(obj model.KubernetesObject) core_model.ResourceKey {
var name string
if obj.Scope() == model.ScopeCluster {
name = obj.GetName()
} else {
name = fmt.Sprintf("%s.%s", obj.GetName(), obj.GetNamespace())
}
return core_model.ResourceKey{
Name: name,
Mesh: obj.GetMesh(),
}
}
func (k *listener) OnAdd(obj interface{}, _ bool) {
kobj := obj.(model.KubernetesObject)
if err := k.addTypeInformationToObject(kobj); err != nil {
log.Error(err, "unable to add TypeMeta to KubernetesObject")
return
}
k.out.Send(events.ResourceChangedEvent{
Operation: events.Create,
Type: core_model.ResourceType(kobj.GetObjectKind().GroupVersionKind().Kind),
Key: resourceKey(kobj),
})
}
func (k *listener) OnUpdate(oldObj, newObj interface{}) {
kobj := newObj.(model.KubernetesObject)
if err := k.addTypeInformationToObject(kobj); err != nil {
log.Error(err, "unable to add TypeMeta to KubernetesObject")
return
}
k.out.Send(events.ResourceChangedEvent{
Operation: events.Update,
Type: core_model.ResourceType(kobj.GetObjectKind().GroupVersionKind().Kind),
Key: resourceKey(kobj),
})
}
func (k *listener) OnDelete(obj interface{}) {
kobj := obj.(model.KubernetesObject)
if err := k.addTypeInformationToObject(kobj); err != nil {
log.Error(err, "unable to add TypeMeta to KubernetesObject")
return
}
k.out.Send(events.ResourceChangedEvent{
Operation: events.Delete,
Type: core_model.ResourceType(kobj.GetObjectKind().GroupVersionKind().Kind),
Key: resourceKey(kobj),
})
}
func (k *listener) NeedLeaderElection() bool {
return false
}
func (k *listener) addTypeInformationToObject(obj runtime.Object) error {
gvks, _, err := k.mgr.GetScheme().ObjectKinds(obj)
if err != nil {
return errors.Wrap(err, "missing apiVersion or kind and cannot assign it")
}
for _, gvk := range gvks {
if len(gvk.Kind) == 0 {
continue
}
if len(gvk.Version) == 0 || gvk.Version == runtime.APIVersionInternal {
continue
}
obj.GetObjectKind().SetGroupVersionKind(gvk)
break
}
return nil
}
func (k *listener) createListerWatcher(gvk schema.GroupVersionKind) (cache.ListerWatcher, error) {
mapping, err := k.mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
httpClient, err := rest.HTTPClientFor(k.mgr.GetConfig())
if err != nil {
return nil, errors.Wrap(err, "failed to create HTTP client from Manager config")
}
client, err := apiutil.RESTClientForGVK(gvk, false, k.mgr.GetConfig(), serializer.NewCodecFactory(k.mgr.GetScheme()), httpClient)
if err != nil {
return nil, err
}
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
listObj, err := k.mgr.GetScheme().New(listGVK)
if err != nil {
return nil, err
}
paramCodec := runtime.NewParameterCodec(k.mgr.GetScheme())
ctx := context.Background()
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
res := listObj.DeepCopyObject()
err := client.Get().
Resource(mapping.Resource.Resource).
VersionedParams(&opts, paramCodec).
Do(ctx).
Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
return client.Get().
Resource(mapping.Resource.Resource).
VersionedParams(&opts, paramCodec).
Watch(ctx)
},
}, nil
}