| /* |
| Copyright 2017 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package apiserver |
| |
| import ( |
| "fmt" |
| "sort" |
| "time" |
| |
| "k8s.io/klog" |
| |
| autoscaling "k8s.io/api/autoscaling/v1" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/labels" |
| "k8s.io/apimachinery/pkg/runtime/schema" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/apimachinery/pkg/version" |
| "k8s.io/apiserver/pkg/endpoints/discovery" |
| "k8s.io/client-go/tools/cache" |
| "k8s.io/client-go/util/workqueue" |
| |
| "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" |
| informers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion" |
| listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion" |
| ) |
| |
| type DiscoveryController struct { |
| versionHandler *versionDiscoveryHandler |
| groupHandler *groupDiscoveryHandler |
| |
| crdLister listers.CustomResourceDefinitionLister |
| crdsSynced cache.InformerSynced |
| |
| // To allow injection for testing. |
| syncFn func(version schema.GroupVersion) error |
| |
| queue workqueue.RateLimitingInterface |
| } |
| |
| func NewDiscoveryController(crdInformer informers.CustomResourceDefinitionInformer, versionHandler *versionDiscoveryHandler, groupHandler *groupDiscoveryHandler) *DiscoveryController { |
| c := &DiscoveryController{ |
| versionHandler: versionHandler, |
| groupHandler: groupHandler, |
| crdLister: crdInformer.Lister(), |
| crdsSynced: crdInformer.Informer().HasSynced, |
| |
| queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DiscoveryController"), |
| } |
| |
| crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| AddFunc: c.addCustomResourceDefinition, |
| UpdateFunc: c.updateCustomResourceDefinition, |
| DeleteFunc: c.deleteCustomResourceDefinition, |
| }) |
| |
| c.syncFn = c.sync |
| |
| return c |
| } |
| |
| func (c *DiscoveryController) sync(version schema.GroupVersion) error { |
| |
| apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{} |
| apiResourcesForDiscovery := []metav1.APIResource{} |
| versionsForDiscoveryMap := map[metav1.GroupVersion]bool{} |
| |
| crds, err := c.crdLister.List(labels.Everything()) |
| if err != nil { |
| return err |
| } |
| foundVersion := false |
| foundGroup := false |
| for _, crd := range crds { |
| if !apiextensions.IsCRDConditionTrue(crd, apiextensions.Established) { |
| continue |
| } |
| |
| if crd.Spec.Group != version.Group { |
| continue |
| } |
| |
| foundThisVersion := false |
| for _, v := range crd.Spec.Versions { |
| if !v.Served { |
| continue |
| } |
| // If there is any Served version, that means the group should show up in discovery |
| foundGroup = true |
| |
| gv := metav1.GroupVersion{Group: crd.Spec.Group, Version: v.Name} |
| if !versionsForDiscoveryMap[gv] { |
| versionsForDiscoveryMap[gv] = true |
| apiVersionsForDiscovery = append(apiVersionsForDiscovery, metav1.GroupVersionForDiscovery{ |
| GroupVersion: crd.Spec.Group + "/" + v.Name, |
| Version: v.Name, |
| }) |
| } |
| if v.Name == version.Version { |
| foundThisVersion = true |
| } |
| } |
| |
| if !foundThisVersion { |
| continue |
| } |
| foundVersion = true |
| |
| verbs := metav1.Verbs([]string{"delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"}) |
| // if we're terminating we don't allow some verbs |
| if apiextensions.IsCRDConditionTrue(crd, apiextensions.Terminating) { |
| verbs = metav1.Verbs([]string{"delete", "deletecollection", "get", "list", "watch"}) |
| } |
| |
| apiResourcesForDiscovery = append(apiResourcesForDiscovery, metav1.APIResource{ |
| Name: crd.Status.AcceptedNames.Plural, |
| SingularName: crd.Status.AcceptedNames.Singular, |
| Namespaced: crd.Spec.Scope == apiextensions.NamespaceScoped, |
| Kind: crd.Status.AcceptedNames.Kind, |
| Verbs: verbs, |
| ShortNames: crd.Status.AcceptedNames.ShortNames, |
| Categories: crd.Status.AcceptedNames.Categories, |
| }) |
| |
| subresources, err := getSubresourcesForVersion(crd, version.Version) |
| if err != nil { |
| return err |
| } |
| if subresources != nil && subresources.Status != nil { |
| apiResourcesForDiscovery = append(apiResourcesForDiscovery, metav1.APIResource{ |
| Name: crd.Status.AcceptedNames.Plural + "/status", |
| Namespaced: crd.Spec.Scope == apiextensions.NamespaceScoped, |
| Kind: crd.Status.AcceptedNames.Kind, |
| Verbs: metav1.Verbs([]string{"get", "patch", "update"}), |
| }) |
| } |
| |
| if subresources != nil && subresources.Scale != nil { |
| apiResourcesForDiscovery = append(apiResourcesForDiscovery, metav1.APIResource{ |
| Group: autoscaling.GroupName, |
| Version: "v1", |
| Kind: "Scale", |
| Name: crd.Status.AcceptedNames.Plural + "/scale", |
| Namespaced: crd.Spec.Scope == apiextensions.NamespaceScoped, |
| Verbs: metav1.Verbs([]string{"get", "patch", "update"}), |
| }) |
| } |
| } |
| |
| if !foundGroup { |
| c.groupHandler.unsetDiscovery(version.Group) |
| c.versionHandler.unsetDiscovery(version) |
| return nil |
| } |
| |
| sortGroupDiscoveryByKubeAwareVersion(apiVersionsForDiscovery) |
| |
| apiGroup := metav1.APIGroup{ |
| Name: version.Group, |
| Versions: apiVersionsForDiscovery, |
| // the preferred versions for a group is the first item in |
| // apiVersionsForDiscovery after it put in the right ordered |
| PreferredVersion: apiVersionsForDiscovery[0], |
| } |
| c.groupHandler.setDiscovery(version.Group, discovery.NewAPIGroupHandler(Codecs, apiGroup)) |
| |
| if !foundVersion { |
| c.versionHandler.unsetDiscovery(version) |
| return nil |
| } |
| c.versionHandler.setDiscovery(version, discovery.NewAPIVersionHandler(Codecs, version, discovery.APIResourceListerFunc(func() []metav1.APIResource { |
| return apiResourcesForDiscovery |
| }))) |
| |
| return nil |
| } |
| |
| func sortGroupDiscoveryByKubeAwareVersion(gd []metav1.GroupVersionForDiscovery) { |
| sort.Slice(gd, func(i, j int) bool { |
| return version.CompareKubeAwareVersionStrings(gd[i].Version, gd[j].Version) > 0 |
| }) |
| } |
| |
| func (c *DiscoveryController) Run(stopCh <-chan struct{}) { |
| defer utilruntime.HandleCrash() |
| defer c.queue.ShutDown() |
| defer klog.Infof("Shutting down DiscoveryController") |
| |
| klog.Infof("Starting DiscoveryController") |
| |
| if !cache.WaitForCacheSync(stopCh, c.crdsSynced) { |
| utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) |
| return |
| } |
| |
| // only start one worker thread since its a slow moving API |
| go wait.Until(c.runWorker, time.Second, stopCh) |
| |
| <-stopCh |
| } |
| |
| func (c *DiscoveryController) runWorker() { |
| for c.processNextWorkItem() { |
| } |
| } |
| |
| // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. |
| func (c *DiscoveryController) processNextWorkItem() bool { |
| key, quit := c.queue.Get() |
| if quit { |
| return false |
| } |
| defer c.queue.Done(key) |
| |
| err := c.syncFn(key.(schema.GroupVersion)) |
| if err == nil { |
| c.queue.Forget(key) |
| return true |
| } |
| |
| utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err)) |
| c.queue.AddRateLimited(key) |
| |
| return true |
| } |
| |
| func (c *DiscoveryController) enqueue(obj *apiextensions.CustomResourceDefinition) { |
| for _, v := range obj.Spec.Versions { |
| c.queue.Add(schema.GroupVersion{Group: obj.Spec.Group, Version: v.Name}) |
| } |
| } |
| |
| func (c *DiscoveryController) addCustomResourceDefinition(obj interface{}) { |
| castObj := obj.(*apiextensions.CustomResourceDefinition) |
| klog.V(4).Infof("Adding customresourcedefinition %s", castObj.Name) |
| c.enqueue(castObj) |
| } |
| |
| func (c *DiscoveryController) updateCustomResourceDefinition(oldObj, newObj interface{}) { |
| castNewObj := newObj.(*apiextensions.CustomResourceDefinition) |
| castOldObj := oldObj.(*apiextensions.CustomResourceDefinition) |
| klog.V(4).Infof("Updating customresourcedefinition %s", castOldObj.Name) |
| // Enqueue both old and new object to make sure we remove and add appropriate Versions. |
| // The working queue will resolve any duplicates and only changes will stay in the queue. |
| c.enqueue(castNewObj) |
| c.enqueue(castOldObj) |
| } |
| |
| func (c *DiscoveryController) deleteCustomResourceDefinition(obj interface{}) { |
| castObj, ok := obj.(*apiextensions.CustomResourceDefinition) |
| if !ok { |
| tombstone, ok := obj.(cache.DeletedFinalStateUnknown) |
| if !ok { |
| klog.Errorf("Couldn't get object from tombstone %#v", obj) |
| return |
| } |
| castObj, ok = tombstone.Obj.(*apiextensions.CustomResourceDefinition) |
| if !ok { |
| klog.Errorf("Tombstone contained object that is not expected %#v", obj) |
| return |
| } |
| } |
| klog.V(4).Infof("Deleting customresourcedefinition %q", castObj.Name) |
| c.enqueue(castObj) |
| } |