| /* |
| Copyright 2015 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package namespace |
| |
| import ( |
| "fmt" |
| "time" |
| |
| "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apimachinery/pkg/util/wait" |
| "k8s.io/client-go/dynamic" |
| coreinformers "k8s.io/client-go/informers/core/v1" |
| clientset "k8s.io/client-go/kubernetes" |
| corelisters "k8s.io/client-go/listers/core/v1" |
| "k8s.io/client-go/tools/cache" |
| "k8s.io/client-go/util/workqueue" |
| "k8s.io/kubernetes/pkg/controller" |
| "k8s.io/kubernetes/pkg/controller/namespace/deletion" |
| "k8s.io/kubernetes/pkg/util/metrics" |
| |
| "k8s.io/klog" |
| ) |
| |
| const ( |
| // namespaceDeletionGracePeriod is the time period to wait before processing a received namespace event. |
| // This allows time for the following to occur: |
| // * lifecycle admission plugins on HA apiservers to also observe a namespace |
| // deletion and prevent new objects from being created in the terminating namespace |
| // * non-leader etcd servers to observe last-minute object creations in a namespace |
| // so this controller's cleanup can actually clean up all objects |
| namespaceDeletionGracePeriod = 5 * time.Second |
| ) |
| |
| // NamespaceController is responsible for performing actions dependent upon a namespace phase |
| type NamespaceController struct { |
| // lister that can list namespaces from a shared cache |
| lister corelisters.NamespaceLister |
| // returns true when the namespace cache is ready |
| listerSynced cache.InformerSynced |
| // namespaces that have been queued up for processing by workers |
| queue workqueue.RateLimitingInterface |
| // helper to delete all resources in the namespace when the namespace is deleted. |
| namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface |
| } |
| |
| // NewNamespaceController creates a new NamespaceController |
| func NewNamespaceController( |
| kubeClient clientset.Interface, |
| dynamicClient dynamic.Interface, |
| discoverResourcesFn func() ([]*metav1.APIResourceList, error), |
| namespaceInformer coreinformers.NamespaceInformer, |
| resyncPeriod time.Duration, |
| finalizerToken v1.FinalizerName) *NamespaceController { |
| |
| // create the controller so we can inject the enqueue function |
| namespaceController := &NamespaceController{ |
| queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"), |
| namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.CoreV1().Namespaces(), dynamicClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken, true), |
| } |
| |
| if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { |
| metrics.RegisterMetricAndTrackRateLimiterUsage("namespace_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) |
| } |
| |
| // configure the namespace informer event handlers |
| namespaceInformer.Informer().AddEventHandlerWithResyncPeriod( |
| cache.ResourceEventHandlerFuncs{ |
| AddFunc: func(obj interface{}) { |
| namespace := obj.(*v1.Namespace) |
| namespaceController.enqueueNamespace(namespace) |
| }, |
| UpdateFunc: func(oldObj, newObj interface{}) { |
| namespace := newObj.(*v1.Namespace) |
| namespaceController.enqueueNamespace(namespace) |
| }, |
| }, |
| resyncPeriod, |
| ) |
| namespaceController.lister = namespaceInformer.Lister() |
| namespaceController.listerSynced = namespaceInformer.Informer().HasSynced |
| |
| return namespaceController |
| } |
| |
| // enqueueNamespace adds an object to the controller work queue |
| // obj could be an *v1.Namespace, or a DeletionFinalStateUnknown item. |
| func (nm *NamespaceController) enqueueNamespace(obj interface{}) { |
| key, err := controller.KeyFunc(obj) |
| if err != nil { |
| utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) |
| return |
| } |
| |
| namespace := obj.(*v1.Namespace) |
| // don't queue if we aren't deleted |
| if namespace.DeletionTimestamp == nil || namespace.DeletionTimestamp.IsZero() { |
| return |
| } |
| |
| // delay processing namespace events to allow HA api servers to observe namespace deletion, |
| // and HA etcd servers to observe last minute object creations inside the namespace |
| nm.queue.AddAfter(key, namespaceDeletionGracePeriod) |
| } |
| |
| // worker processes the queue of namespace objects. |
| // Each namespace can be in the queue at most once. |
| // The system ensures that no two workers can process |
| // the same namespace at the same time. |
| func (nm *NamespaceController) worker() { |
| workFunc := func() bool { |
| key, quit := nm.queue.Get() |
| if quit { |
| return true |
| } |
| defer nm.queue.Done(key) |
| |
| err := nm.syncNamespaceFromKey(key.(string)) |
| if err == nil { |
| // no error, forget this entry and return |
| nm.queue.Forget(key) |
| return false |
| } |
| |
| if estimate, ok := err.(*deletion.ResourcesRemainingError); ok { |
| t := estimate.Estimate/2 + 1 |
| klog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t) |
| nm.queue.AddAfter(key, time.Duration(t)*time.Second) |
| } else { |
| // rather than wait for a full resync, re-add the namespace to the queue to be processed |
| nm.queue.AddRateLimited(key) |
| utilruntime.HandleError(err) |
| } |
| return false |
| } |
| |
| for { |
| quit := workFunc() |
| |
| if quit { |
| return |
| } |
| } |
| } |
| |
| // syncNamespaceFromKey looks for a namespace with the specified key in its store and synchronizes it |
| func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) { |
| startTime := time.Now() |
| defer func() { |
| klog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Since(startTime)) |
| }() |
| |
| namespace, err := nm.lister.Get(key) |
| if errors.IsNotFound(err) { |
| klog.Infof("Namespace has been deleted %v", key) |
| return nil |
| } |
| if err != nil { |
| utilruntime.HandleError(fmt.Errorf("Unable to retrieve namespace %v from store: %v", key, err)) |
| return err |
| } |
| return nm.namespacedResourcesDeleter.Delete(namespace.Name) |
| } |
| |
| // Run starts observing the system with the specified number of workers. |
| func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) { |
| defer utilruntime.HandleCrash() |
| defer nm.queue.ShutDown() |
| |
| klog.Infof("Starting namespace controller") |
| defer klog.Infof("Shutting down namespace controller") |
| |
| if !controller.WaitForCacheSync("namespace", stopCh, nm.listerSynced) { |
| return |
| } |
| |
| klog.V(5).Info("Starting workers of namespace controller") |
| for i := 0; i < workers; i++ { |
| go wait.Until(nm.worker, time.Second, stopCh) |
| } |
| <-stopCh |
| } |