| /* |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package watcher |
| |
| import ( |
| "errors" |
| "fmt" |
| "log" |
| "time" |
| |
| v1 "k8s.io/api/core/v1" |
| |
| k "k8s.io/client-go/kubernetes" |
| |
| "k8s.io/client-go/tools/cache" |
| |
| v1beta1 "k8s.io/api/extensions/v1beta1" |
| |
| "k8s.io/apimachinery/pkg/fields" |
| pkgruntime "k8s.io/apimachinery/pkg/runtime" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| |
| "ingress-ats/endpoint" |
| ) |
| |
| // FIXME: watching all namespace does not work... |
| |
| // Watcher stores all essential information to act on HostGroups |
| type Watcher struct { |
| Cs *k.Clientset |
| ATSNamespace string |
| Ep *endpoint.Endpoint |
| StopChan chan struct{} |
| } |
| |
| // EventHandler interface defines the 3 required methods to implement for watchers |
| type EventHandler interface { |
| Add(obj interface{}) |
| Update(obj, newObj interface{}) |
| Delete(obj interface{}) |
| GetResourceName() string // EventHandler should store the ResourceName e.g. ingresses, endpoints... |
| } |
| |
| // Watch creates necessary threads to watch over resources |
| func (w *Watcher) Watch() error { |
| |
| //================= Watch for Ingress ================== |
| igHandler := IgHandler{"ingresses", w.Ep} |
| err := w.allNamespacesWatchFor(&igHandler, w.Cs.ExtensionsV1beta1().RESTClient(), |
| fields.Everything(), &v1beta1.Ingress{}, 0) |
| if err != nil { |
| return err |
| } |
| //================= Watch for Endpoints ================= |
| epHandler := EpHandler{"endpoints", w.Ep} |
| err = w.allNamespacesWatchFor(&epHandler, w.Cs.CoreV1().RESTClient(), |
| fields.Everything(), &v1.Endpoints{}, 0) |
| if err != nil { |
| return err |
| } |
| //================= Watch for ConfigMaps ================= |
| cmHandler := CMHandler{"configmaps", w.Ep} |
| targetNs := make([]string, 1, 1) |
| targetNs[0] = w.Ep.ATSManager.Namespace |
| err = w.inNamespacesWatchFor(&cmHandler, w.Cs.CoreV1().RESTClient(), |
| targetNs, fields.Everything(), &v1.ConfigMap{}, 0) |
| if err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| func (w *Watcher) allNamespacesWatchFor(h EventHandler, c cache.Getter, |
| fieldSelector fields.Selector, objType pkgruntime.Object, |
| resyncPeriod time.Duration) error { |
| epListWatch := cache.NewListWatchFromClient(c, h.GetResourceName(), v1.NamespaceAll, fieldSelector) |
| sharedInformer := cache.NewSharedInformer(epListWatch, objType, resyncPeriod) |
| |
| sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| AddFunc: h.Add, |
| UpdateFunc: h.Update, |
| DeleteFunc: h.Delete, |
| }) |
| |
| go sharedInformer.Run(w.StopChan) // new thread |
| |
| if !cache.WaitForCacheSync(w.StopChan, sharedInformer.HasSynced) { |
| s := fmt.Sprintf("Timed out waiting for %s caches to sync", h.GetResourceName()) |
| utilruntime.HandleError(fmt.Errorf(s)) |
| return errors.New(s) |
| } |
| return nil |
| } |
| |
| // This is meant to make it easier to add resource watchers on resources that |
| // span multiple namespaces |
| func (w *Watcher) inNamespacesWatchFor(h EventHandler, c cache.Getter, |
| namespaces []string, fieldSelector fields.Selector, objType pkgruntime.Object, |
| resyncPeriod time.Duration) error { |
| if len(namespaces) == 0 { |
| log.Panic("inNamespacesWatchFor must have at least 1 namespace") |
| } |
| syncFuncs := make([]cache.InformerSynced, len(namespaces)) |
| for i, ns := range namespaces { |
| epListWatch := cache.NewListWatchFromClient(c, h.GetResourceName(), ns, fieldSelector) |
| sharedInformer := cache.NewSharedInformer(epListWatch, objType, resyncPeriod) |
| |
| sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| AddFunc: h.Add, |
| UpdateFunc: h.Update, |
| DeleteFunc: h.Delete, |
| }) |
| |
| go sharedInformer.Run(w.StopChan) // new thread |
| |
| syncFuncs[i] = sharedInformer.HasSynced |
| } |
| if !cache.WaitForCacheSync(w.StopChan, syncFuncs...) { |
| s := fmt.Sprintf("Timed out waiting for %s caches to sync", h.GetResourceName()) |
| utilruntime.HandleError(fmt.Errorf(s)) |
| return errors.New(s) |
| } |
| return nil |
| } |