| /* |
| Copyright 2014 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| // Package factory can set up a scheduler. This code is here instead of |
| // cmd/scheduler for both testability and reuse. |
| package factory |
| |
| import ( |
| "fmt" |
| "os" |
| "os/signal" |
| "reflect" |
| "time" |
| |
| "k8s.io/klog" |
| |
| "k8s.io/api/core/v1" |
| storagev1 "k8s.io/api/storage/v1" |
| "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/fields" |
| "k8s.io/apimachinery/pkg/labels" |
| "k8s.io/apimachinery/pkg/runtime/schema" |
| "k8s.io/apimachinery/pkg/types" |
| "k8s.io/apimachinery/pkg/util/runtime" |
| "k8s.io/apimachinery/pkg/util/sets" |
| "k8s.io/apimachinery/pkg/util/wait" |
| utilfeature "k8s.io/apiserver/pkg/util/feature" |
| appsinformers "k8s.io/client-go/informers/apps/v1" |
| coreinformers "k8s.io/client-go/informers/core/v1" |
| policyinformers "k8s.io/client-go/informers/policy/v1beta1" |
| storageinformers "k8s.io/client-go/informers/storage/v1" |
| clientset "k8s.io/client-go/kubernetes" |
| appslisters "k8s.io/client-go/listers/apps/v1" |
| corelisters "k8s.io/client-go/listers/core/v1" |
| policylisters "k8s.io/client-go/listers/policy/v1beta1" |
| storagelisters "k8s.io/client-go/listers/storage/v1" |
| "k8s.io/client-go/tools/cache" |
| "k8s.io/client-go/tools/record" |
| podutil "k8s.io/kubernetes/pkg/api/v1/pod" |
| "k8s.io/kubernetes/pkg/apis/core/helper" |
| "k8s.io/kubernetes/pkg/features" |
| kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" |
| "k8s.io/kubernetes/pkg/scheduler/algorithm" |
| "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" |
| schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" |
| "k8s.io/kubernetes/pkg/scheduler/api/validation" |
| "k8s.io/kubernetes/pkg/scheduler/core" |
| "k8s.io/kubernetes/pkg/scheduler/core/equivalence" |
| schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" |
| cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger" |
| internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" |
| "k8s.io/kubernetes/pkg/scheduler/util" |
| "k8s.io/kubernetes/pkg/scheduler/volumebinder" |
| ) |
| |
| const ( |
| initialGetBackoff = 100 * time.Millisecond |
| maximalGetBackoff = time.Minute |
| ) |
| |
| var ( |
| serviceAffinitySet = sets.NewString(predicates.CheckServiceAffinityPred) |
| matchInterPodAffinitySet = sets.NewString(predicates.MatchInterPodAffinityPred) |
| generalPredicatesSets = sets.NewString(predicates.GeneralPred) |
| noDiskConflictSet = sets.NewString(predicates.NoDiskConflictPred) |
| maxPDVolumeCountPredicateKeys = []string{predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred, predicates.MaxEBSVolumeCountPred} |
| ) |
| |
| // Binder knows how to write a binding. |
| type Binder interface { |
| Bind(binding *v1.Binding) error |
| } |
| |
| // PodConditionUpdater updates the condition of a pod based on the passed |
| // PodCondition |
| type PodConditionUpdater interface { |
| Update(pod *v1.Pod, podCondition *v1.PodCondition) error |
| } |
| |
| // Config is an implementation of the Scheduler's configured input data. |
| // TODO over time we should make this struct a hidden implementation detail of the scheduler. |
| type Config struct { |
| // It is expected that changes made via SchedulerCache will be observed |
| // by NodeLister and Algorithm. |
| SchedulerCache schedulerinternalcache.Cache |
| // Ecache is used for optimistically invalid affected cache items after |
| // successfully binding a pod |
| Ecache *equivalence.Cache |
| NodeLister algorithm.NodeLister |
| Algorithm algorithm.ScheduleAlgorithm |
| GetBinder func(pod *v1.Pod) Binder |
| // PodConditionUpdater is used only in case of scheduling errors. If we succeed |
| // with scheduling, PodScheduled condition will be updated in apiserver in /bind |
| // handler so that binding and setting PodCondition it is atomic. |
| PodConditionUpdater PodConditionUpdater |
| // PodPreemptor is used to evict pods and update pod annotations. |
| PodPreemptor PodPreemptor |
| |
| // NextPod should be a function that blocks until the next pod |
| // is available. We don't use a channel for this, because scheduling |
| // a pod may take some amount of time and we don't want pods to get |
| // stale while they sit in a channel. |
| NextPod func() *v1.Pod |
| |
| // WaitForCacheSync waits for scheduler cache to populate. |
| // It returns true if it was successful, false if the controller should shutdown. |
| WaitForCacheSync func() bool |
| |
| // Error is called if there is an error. It is passed the pod in |
| // question, and the error |
| Error func(*v1.Pod, error) |
| |
| // Recorder is the EventRecorder to use |
| Recorder record.EventRecorder |
| |
| // Close this to shut down the scheduler. |
| StopEverything <-chan struct{} |
| |
| // VolumeBinder handles PVC/PV binding for the pod. |
| VolumeBinder *volumebinder.VolumeBinder |
| |
| // Disable pod preemption or not. |
| DisablePreemption bool |
| |
| // SchedulingQueue holds pods to be scheduled |
| SchedulingQueue internalqueue.SchedulingQueue |
| } |
| |
| // PodPreemptor has methods needed to delete a pod and to update |
| // annotations of the preemptor pod. |
| type PodPreemptor interface { |
| GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) |
| DeletePod(pod *v1.Pod) error |
| SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error |
| RemoveNominatedNodeName(pod *v1.Pod) error |
| } |
| |
| // Configurator defines I/O, caching, and other functionality needed to |
| // construct a new scheduler. An implementation of this can be seen in |
| // factory.go. |
| type Configurator interface { |
| // Exposed for testing |
| GetHardPodAffinitySymmetricWeight() int32 |
| // Exposed for testing |
| MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error) |
| |
| // Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler |
| GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error) |
| GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) |
| |
| // Needs to be exposed for things like integration tests where we want to make fake nodes. |
| GetNodeLister() corelisters.NodeLister |
| // Exposed for testing |
| GetClient() clientset.Interface |
| // Exposed for testing |
| GetScheduledPodLister() corelisters.PodLister |
| |
| Create() (*Config, error) |
| CreateFromProvider(providerName string) (*Config, error) |
| CreateFromConfig(policy schedulerapi.Policy) (*Config, error) |
| CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) |
| } |
| |
| // configFactory is the default implementation of the scheduler.Configurator interface. |
| type configFactory struct { |
| client clientset.Interface |
| // queue for pods that need scheduling |
| podQueue internalqueue.SchedulingQueue |
| // a means to list all known scheduled pods. |
| scheduledPodLister corelisters.PodLister |
| // a means to list all known scheduled pods and pods assumed to have been scheduled. |
| podLister algorithm.PodLister |
| // a means to list all nodes |
| nodeLister corelisters.NodeLister |
| // a means to list all PersistentVolumes |
| pVLister corelisters.PersistentVolumeLister |
| // a means to list all PersistentVolumeClaims |
| pVCLister corelisters.PersistentVolumeClaimLister |
| // a means to list all services |
| serviceLister corelisters.ServiceLister |
| // a means to list all controllers |
| controllerLister corelisters.ReplicationControllerLister |
| // a means to list all replicasets |
| replicaSetLister appslisters.ReplicaSetLister |
| // a means to list all statefulsets |
| statefulSetLister appslisters.StatefulSetLister |
| // a means to list all PodDisruptionBudgets |
| pdbLister policylisters.PodDisruptionBudgetLister |
| // a means to list all StorageClasses |
| storageClassLister storagelisters.StorageClassLister |
| |
| // Close this to stop all reflectors |
| StopEverything <-chan struct{} |
| |
| scheduledPodsHasSynced cache.InformerSynced |
| |
| schedulerCache schedulerinternalcache.Cache |
| |
| // SchedulerName of a scheduler is used to select which pods will be |
| // processed by this scheduler, based on pods's "spec.schedulerName". |
| schedulerName string |
| |
| // RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule |
| // corresponding to every RequiredDuringScheduling affinity rule. |
| // HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100. |
| hardPodAffinitySymmetricWeight int32 |
| |
| // Equivalence class cache |
| equivalencePodCache *equivalence.Cache |
| |
| // Enable equivalence class cache |
| enableEquivalenceClassCache bool |
| |
| // Handles volume binding decisions |
| volumeBinder *volumebinder.VolumeBinder |
| |
| // Always check all predicates even if the middle of one predicate fails. |
| alwaysCheckAllPredicates bool |
| |
| // Disable pod preemption or not. |
| disablePreemption bool |
| |
| // percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle. |
| percentageOfNodesToScore int32 |
| } |
| |
| // ConfigFactoryArgs is a set arguments passed to NewConfigFactory. |
| type ConfigFactoryArgs struct { |
| SchedulerName string |
| Client clientset.Interface |
| NodeInformer coreinformers.NodeInformer |
| PodInformer coreinformers.PodInformer |
| PvInformer coreinformers.PersistentVolumeInformer |
| PvcInformer coreinformers.PersistentVolumeClaimInformer |
| ReplicationControllerInformer coreinformers.ReplicationControllerInformer |
| ReplicaSetInformer appsinformers.ReplicaSetInformer |
| StatefulSetInformer appsinformers.StatefulSetInformer |
| ServiceInformer coreinformers.ServiceInformer |
| PdbInformer policyinformers.PodDisruptionBudgetInformer |
| StorageClassInformer storageinformers.StorageClassInformer |
| HardPodAffinitySymmetricWeight int32 |
| EnableEquivalenceClassCache bool |
| DisablePreemption bool |
| PercentageOfNodesToScore int32 |
| BindTimeoutSeconds int64 |
| StopCh <-chan struct{} |
| } |
| |
| // NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only |
| // return the interface. |
| func NewConfigFactory(args *ConfigFactoryArgs) Configurator { |
| stopEverything := args.StopCh |
| if stopEverything == nil { |
| stopEverything = wait.NeverStop |
| } |
| schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything) |
| |
| // storageClassInformer is only enabled through VolumeScheduling feature gate |
| var storageClassLister storagelisters.StorageClassLister |
| if args.StorageClassInformer != nil { |
| storageClassLister = args.StorageClassInformer.Lister() |
| } |
| c := &configFactory{ |
| client: args.Client, |
| podLister: schedulerCache, |
| podQueue: internalqueue.NewSchedulingQueue(), |
| nodeLister: args.NodeInformer.Lister(), |
| pVLister: args.PvInformer.Lister(), |
| pVCLister: args.PvcInformer.Lister(), |
| serviceLister: args.ServiceInformer.Lister(), |
| controllerLister: args.ReplicationControllerInformer.Lister(), |
| replicaSetLister: args.ReplicaSetInformer.Lister(), |
| statefulSetLister: args.StatefulSetInformer.Lister(), |
| pdbLister: args.PdbInformer.Lister(), |
| storageClassLister: storageClassLister, |
| schedulerCache: schedulerCache, |
| StopEverything: stopEverything, |
| schedulerName: args.SchedulerName, |
| hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight, |
| enableEquivalenceClassCache: args.EnableEquivalenceClassCache, |
| disablePreemption: args.DisablePreemption, |
| percentageOfNodesToScore: args.PercentageOfNodesToScore, |
| } |
| |
| c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced |
| // scheduled pod cache |
| args.PodInformer.Informer().AddEventHandler( |
| cache.FilteringResourceEventHandler{ |
| FilterFunc: func(obj interface{}) bool { |
| switch t := obj.(type) { |
| case *v1.Pod: |
| return assignedPod(t) |
| case cache.DeletedFinalStateUnknown: |
| if pod, ok := t.Obj.(*v1.Pod); ok { |
| return assignedPod(pod) |
| } |
| runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c)) |
| return false |
| default: |
| runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj)) |
| return false |
| } |
| }, |
| Handler: cache.ResourceEventHandlerFuncs{ |
| AddFunc: c.addPodToCache, |
| UpdateFunc: c.updatePodInCache, |
| DeleteFunc: c.deletePodFromCache, |
| }, |
| }, |
| ) |
| // unscheduled pod queue |
| args.PodInformer.Informer().AddEventHandler( |
| cache.FilteringResourceEventHandler{ |
| FilterFunc: func(obj interface{}) bool { |
| switch t := obj.(type) { |
| case *v1.Pod: |
| return !assignedPod(t) && responsibleForPod(t, args.SchedulerName) |
| case cache.DeletedFinalStateUnknown: |
| if pod, ok := t.Obj.(*v1.Pod); ok { |
| return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName) |
| } |
| runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c)) |
| return false |
| default: |
| runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj)) |
| return false |
| } |
| }, |
| Handler: cache.ResourceEventHandlerFuncs{ |
| AddFunc: c.addPodToSchedulingQueue, |
| UpdateFunc: c.updatePodInSchedulingQueue, |
| DeleteFunc: c.deletePodFromSchedulingQueue, |
| }, |
| }, |
| ) |
| // ScheduledPodLister is something we provide to plug-in functions that |
| // they may need to call. |
| c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()} |
| |
| args.NodeInformer.Informer().AddEventHandler( |
| cache.ResourceEventHandlerFuncs{ |
| AddFunc: c.addNodeToCache, |
| UpdateFunc: c.updateNodeInCache, |
| DeleteFunc: c.deleteNodeFromCache, |
| }, |
| ) |
| |
| // On add and delete of PVs, it will affect equivalence cache items |
| // related to persistent volume |
| args.PvInformer.Informer().AddEventHandler( |
| cache.ResourceEventHandlerFuncs{ |
| // MaxPDVolumeCountPredicate: since it relies on the counts of PV. |
| AddFunc: c.onPvAdd, |
| UpdateFunc: c.onPvUpdate, |
| DeleteFunc: c.onPvDelete, |
| }, |
| ) |
| |
| // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound. |
| args.PvcInformer.Informer().AddEventHandler( |
| cache.ResourceEventHandlerFuncs{ |
| AddFunc: c.onPvcAdd, |
| UpdateFunc: c.onPvcUpdate, |
| DeleteFunc: c.onPvcDelete, |
| }, |
| ) |
| |
| // This is for ServiceAffinity: affected by the selector of the service is updated. |
| // Also, if new service is added, equivalence cache will also become invalid since |
| // existing pods may be "captured" by this service and change this predicate result. |
| args.ServiceInformer.Informer().AddEventHandler( |
| cache.ResourceEventHandlerFuncs{ |
| AddFunc: c.onServiceAdd, |
| UpdateFunc: c.onServiceUpdate, |
| DeleteFunc: c.onServiceDelete, |
| }, |
| ) |
| |
| // Existing equivalence cache should not be affected by add/delete RC/Deployment etc, |
| // it only make sense when pod is scheduled or deleted |
| |
| if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { |
| // Setup volume binder |
| c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second) |
| |
| args.StorageClassInformer.Informer().AddEventHandler( |
| cache.ResourceEventHandlerFuncs{ |
| AddFunc: c.onStorageClassAdd, |
| DeleteFunc: c.onStorageClassDelete, |
| }, |
| ) |
| } |
| |
| // Setup cache comparer |
| debugger := cachedebugger.New( |
| args.NodeInformer.Lister(), |
| args.PodInformer.Lister(), |
| c.schedulerCache, |
| c.podQueue, |
| ) |
| |
| ch := make(chan os.Signal, 1) |
| signal.Notify(ch, compareSignal) |
| |
| go func() { |
| for { |
| select { |
| case <-c.StopEverything: |
| c.podQueue.Close() |
| return |
| case <-ch: |
| debugger.Comparer.Compare() |
| debugger.Dumper.DumpAll() |
| } |
| } |
| }() |
| |
| return c |
| } |
| |
| // skipPodUpdate checks whether the specified pod update should be ignored. |
| // This function will return true if |
| // - The pod has already been assumed, AND |
| // - The pod has only its ResourceVersion, Spec.NodeName and/or Annotations |
| // updated. |
| func (c *configFactory) skipPodUpdate(pod *v1.Pod) bool { |
| // Non-assumed pods should never be skipped. |
| isAssumed, err := c.schedulerCache.IsAssumedPod(pod) |
| if err != nil { |
| runtime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err)) |
| return false |
| } |
| if !isAssumed { |
| return false |
| } |
| |
| // Gets the assumed pod from the cache. |
| assumedPod, err := c.schedulerCache.GetPod(pod) |
| if err != nil { |
| runtime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err)) |
| return false |
| } |
| |
| // Compares the assumed pod in the cache with the pod update. If they are |
| // equal (with certain fields excluded), this pod update will be skipped. |
| f := func(pod *v1.Pod) *v1.Pod { |
| p := pod.DeepCopy() |
| // ResourceVersion must be excluded because each object update will |
| // have a new resource version. |
| p.ResourceVersion = "" |
| // Spec.NodeName must be excluded because the pod assumed in the cache |
| // is expected to have a node assigned while the pod update may nor may |
| // not have this field set. |
| p.Spec.NodeName = "" |
| // Annotations must be excluded for the reasons described in |
| // https://github.com/kubernetes/kubernetes/issues/52914. |
| p.Annotations = nil |
| return p |
| } |
| assumedPodCopy, podCopy := f(assumedPod), f(pod) |
| if !reflect.DeepEqual(assumedPodCopy, podCopy) { |
| return false |
| } |
| klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name) |
| return true |
| } |
| |
| func (c *configFactory) onPvAdd(obj interface{}) { |
| if c.enableEquivalenceClassCache { |
| pv, ok := obj.(*v1.PersistentVolume) |
| if !ok { |
| klog.Errorf("cannot convert to *v1.PersistentVolume: %v", obj) |
| return |
| } |
| c.invalidatePredicatesForPv(pv) |
| } |
| // Pods created when there are no PVs available will be stuck in |
| // unschedulable queue. But unbound PVs created for static provisioning and |
| // delay binding storage class are skipped in PV controller dynamic |
| // provisiong and binding process, will not trigger events to schedule pod |
| // again. So we need to move pods to active queue on PV add for this |
| // scenario. |
| c.podQueue.MoveAllToActiveQueue() |
| } |
| |
| func (c *configFactory) onPvUpdate(old, new interface{}) { |
| if c.enableEquivalenceClassCache { |
| newPV, ok := new.(*v1.PersistentVolume) |
| if !ok { |
| klog.Errorf("cannot convert to *v1.PersistentVolume: %v", new) |
| return |
| } |
| oldPV, ok := old.(*v1.PersistentVolume) |
| if !ok { |
| klog.Errorf("cannot convert to *v1.PersistentVolume: %v", old) |
| return |
| } |
| c.invalidatePredicatesForPvUpdate(oldPV, newPV) |
| } |
| // Scheduler.bindVolumesWorker may fail to update assumed pod volume |
| // bindings due to conflicts if PVs are updated by PV controller or other |
| // parties, then scheduler will add pod back to unschedulable queue. We |
| // need to move pods to active queue on PV update for this scenario. |
| c.podQueue.MoveAllToActiveQueue() |
| } |
| |
| func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) { |
| invalidPredicates := sets.NewString() |
| // CheckVolumeBinding predicate calls SchedulerVolumeBinder.FindPodVolumes |
| // which will cache PVs in PodBindingCache. When PV got updated, we should |
| // invalidate cache, otherwise PVAssumeCache.Assume will fail with out of sync |
| // error. |
| if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { |
| invalidPredicates.Insert(predicates.CheckVolumeBindingPred) |
| } |
| for k, v := range newPV.Labels { |
| // If PV update modifies the zone/region labels. |
| if isZoneRegionLabel(k) && !reflect.DeepEqual(v, oldPV.Labels[k]) { |
| invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) |
| break |
| } |
| } |
| c.equivalencePodCache.InvalidatePredicates(invalidPredicates) |
| } |
| |
| // isZoneRegionLabel check if given key of label is zone or region label. |
| func isZoneRegionLabel(k string) bool { |
| return k == kubeletapis.LabelZoneFailureDomain || k == kubeletapis.LabelZoneRegion |
| } |
| |
| func (c *configFactory) onPvDelete(obj interface{}) { |
| if c.enableEquivalenceClassCache { |
| var pv *v1.PersistentVolume |
| switch t := obj.(type) { |
| case *v1.PersistentVolume: |
| pv = t |
| case cache.DeletedFinalStateUnknown: |
| var ok bool |
| pv, ok = t.Obj.(*v1.PersistentVolume) |
| if !ok { |
| klog.Errorf("cannot convert to *v1.PersistentVolume: %v", t.Obj) |
| return |
| } |
| default: |
| klog.Errorf("cannot convert to *v1.PersistentVolume: %v", t) |
| return |
| } |
| c.invalidatePredicatesForPv(pv) |
| } |
| } |
| |
| func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { |
| // You could have a PVC that points to a PV, but the PV object doesn't exist. |
| // So when the PV object gets added, we can recount. |
| invalidPredicates := sets.NewString() |
| |
| // PV types which impact MaxPDVolumeCountPredicate |
| if pv.Spec.AWSElasticBlockStore != nil { |
| invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred) |
| } |
| if pv.Spec.GCEPersistentDisk != nil { |
| invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred) |
| } |
| if pv.Spec.AzureDisk != nil { |
| invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred) |
| } |
| |
| if pv.Spec.CSI != nil && utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { |
| invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) |
| } |
| |
| // If PV contains zone related label, it may impact cached NoVolumeZoneConflict |
| for k := range pv.Labels { |
| if isZoneRegionLabel(k) { |
| invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) |
| break |
| } |
| } |
| |
| if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { |
| // Add/delete impacts the available PVs to choose from |
| invalidPredicates.Insert(predicates.CheckVolumeBindingPred) |
| } |
| |
| c.equivalencePodCache.InvalidatePredicates(invalidPredicates) |
| } |
| |
| func (c *configFactory) onPvcAdd(obj interface{}) { |
| if c.enableEquivalenceClassCache { |
| pvc, ok := obj.(*v1.PersistentVolumeClaim) |
| if !ok { |
| klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", obj) |
| return |
| } |
| c.invalidatePredicatesForPvc(pvc) |
| } |
| c.podQueue.MoveAllToActiveQueue() |
| } |
| |
| func (c *configFactory) onPvcUpdate(old, new interface{}) { |
| if !utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { |
| return |
| } |
| |
| if c.enableEquivalenceClassCache { |
| newPVC, ok := new.(*v1.PersistentVolumeClaim) |
| if !ok { |
| klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", new) |
| return |
| } |
| oldPVC, ok := old.(*v1.PersistentVolumeClaim) |
| if !ok { |
| klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", old) |
| return |
| } |
| c.invalidatePredicatesForPvcUpdate(oldPVC, newPVC) |
| } |
| c.podQueue.MoveAllToActiveQueue() |
| } |
| |
| func (c *configFactory) onPvcDelete(obj interface{}) { |
| if c.enableEquivalenceClassCache { |
| var pvc *v1.PersistentVolumeClaim |
| switch t := obj.(type) { |
| case *v1.PersistentVolumeClaim: |
| pvc = t |
| case cache.DeletedFinalStateUnknown: |
| var ok bool |
| pvc, ok = t.Obj.(*v1.PersistentVolumeClaim) |
| if !ok { |
| klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", t.Obj) |
| return |
| } |
| default: |
| klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", t) |
| return |
| } |
| c.invalidatePredicatesForPvc(pvc) |
| } |
| } |
| |
| func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) { |
| // We need to do this here because the ecache uses PVC uid as part of equivalence hash of pod |
| |
| // The bound volume type may change |
| invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...) |
| |
| if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { |
| invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) |
| } |
| |
| // The bound volume's label may change |
| invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) |
| |
| if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { |
| // Add/delete impacts the available PVs to choose from |
| invalidPredicates.Insert(predicates.CheckVolumeBindingPred) |
| } |
| c.equivalencePodCache.InvalidatePredicates(invalidPredicates) |
| } |
| |
| func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) { |
| invalidPredicates := sets.NewString() |
| |
| if old.Spec.VolumeName != new.Spec.VolumeName { |
| if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { |
| // PVC volume binding has changed |
| invalidPredicates.Insert(predicates.CheckVolumeBindingPred) |
| } |
| // The bound volume type may change |
| invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...) |
| |
| if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { |
| invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) |
| } |
| } |
| |
| c.equivalencePodCache.InvalidatePredicates(invalidPredicates) |
| } |
| |
| func (c *configFactory) onStorageClassAdd(obj interface{}) { |
| sc, ok := obj.(*storagev1.StorageClass) |
| if !ok { |
| klog.Errorf("cannot convert to *storagev1.StorageClass: %v", obj) |
| return |
| } |
| |
| // CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these |
| // PVCs have specified StorageClass name, creating StorageClass objects |
| // with late binding will cause predicates to pass, so we need to move pods |
| // to active queue. |
| // We don't need to invalidate cached results because results will not be |
| // cached for pod that has unbound immediate PVCs. |
| if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { |
| c.podQueue.MoveAllToActiveQueue() |
| } |
| } |
| |
| func (c *configFactory) onStorageClassDelete(obj interface{}) { |
| if c.enableEquivalenceClassCache { |
| var sc *storagev1.StorageClass |
| switch t := obj.(type) { |
| case *storagev1.StorageClass: |
| sc = t |
| case cache.DeletedFinalStateUnknown: |
| var ok bool |
| sc, ok = t.Obj.(*storagev1.StorageClass) |
| if !ok { |
| klog.Errorf("cannot convert to *storagev1.StorageClass: %v", t.Obj) |
| return |
| } |
| default: |
| klog.Errorf("cannot convert to *storagev1.StorageClass: %v", t) |
| return |
| } |
| c.invalidatePredicatesForStorageClass(sc) |
| } |
| } |
| |
| func (c *configFactory) invalidatePredicatesForStorageClass(sc *storagev1.StorageClass) { |
| invalidPredicates := sets.NewString() |
| |
| if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { |
| if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { |
| // Delete can cause predicates to fail |
| invalidPredicates.Insert(predicates.CheckVolumeBindingPred) |
| invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) |
| } |
| } |
| |
| c.equivalencePodCache.InvalidatePredicates(invalidPredicates) |
| } |
| |
| func (c *configFactory) onServiceAdd(obj interface{}) { |
| if c.enableEquivalenceClassCache { |
| c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet) |
| } |
| c.podQueue.MoveAllToActiveQueue() |
| } |
| |
| func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) { |
| if c.enableEquivalenceClassCache { |
| // TODO(resouer) We may need to invalidate this for specified group of pods only |
| oldService := oldObj.(*v1.Service) |
| newService := newObj.(*v1.Service) |
| if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) { |
| c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet) |
| } |
| } |
| c.podQueue.MoveAllToActiveQueue() |
| } |
| |
| func (c *configFactory) onServiceDelete(obj interface{}) { |
| if c.enableEquivalenceClassCache { |
| c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet) |
| } |
| c.podQueue.MoveAllToActiveQueue() |
| } |
| |
| // GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests. |
| func (c *configFactory) GetNodeLister() corelisters.NodeLister { |
| return c.nodeLister |
| } |
| |
| func (c *configFactory) GetHardPodAffinitySymmetricWeight() int32 { |
| return c.hardPodAffinitySymmetricWeight |
| } |
| |
| func (c *configFactory) GetSchedulerName() string { |
| return c.schedulerName |
| } |
| |
| // GetClient provides a kubernetes Client, mostly internal use, but may also be called by mock-tests. |
| func (c *configFactory) GetClient() clientset.Interface { |
| return c.client |
| } |
| |
| // GetScheduledPodListerIndexer provides a pod lister, mostly internal use, but may also be called by mock-tests. |
| func (c *configFactory) GetScheduledPodLister() corelisters.PodLister { |
| return c.scheduledPodLister |
| } |
| |
| func (c *configFactory) addPodToCache(obj interface{}) { |
| pod, ok := obj.(*v1.Pod) |
| if !ok { |
| klog.Errorf("cannot convert to *v1.Pod: %v", obj) |
| return |
| } |
| |
| if err := c.schedulerCache.AddPod(pod); err != nil { |
| klog.Errorf("scheduler cache AddPod failed: %v", err) |
| } |
| |
| c.podQueue.AssignedPodAdded(pod) |
| |
| // NOTE: Updating equivalence cache of addPodToCache has been |
| // handled optimistically in: pkg/scheduler/scheduler.go#assume() |
| } |
| |
| func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) { |
| oldPod, ok := oldObj.(*v1.Pod) |
| if !ok { |
| klog.Errorf("cannot convert oldObj to *v1.Pod: %v", oldObj) |
| return |
| } |
| newPod, ok := newObj.(*v1.Pod) |
| if !ok { |
| klog.Errorf("cannot convert newObj to *v1.Pod: %v", newObj) |
| return |
| } |
| |
| // NOTE: Updates must be written to scheduler cache before invalidating |
| // equivalence cache, because we could snapshot equivalence cache after the |
| // invalidation and then snapshot the cache itself. If the cache is |
| // snapshotted before updates are written, we would update equivalence |
| // cache with stale information which is based on snapshot of old cache. |
| if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil { |
| klog.Errorf("scheduler cache UpdatePod failed: %v", err) |
| } |
| |
| c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod) |
| c.podQueue.AssignedPodUpdated(newPod) |
| } |
| |
| func (c *configFactory) addPodToSchedulingQueue(obj interface{}) { |
| if err := c.podQueue.Add(obj.(*v1.Pod)); err != nil { |
| runtime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err)) |
| } |
| } |
| |
| func (c *configFactory) updatePodInSchedulingQueue(oldObj, newObj interface{}) { |
| pod := newObj.(*v1.Pod) |
| if c.skipPodUpdate(pod) { |
| return |
| } |
| if err := c.podQueue.Update(oldObj.(*v1.Pod), pod); err != nil { |
| runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err)) |
| } |
| } |
| |
| func (c *configFactory) deletePodFromSchedulingQueue(obj interface{}) { |
| var pod *v1.Pod |
| switch t := obj.(type) { |
| case *v1.Pod: |
| pod = obj.(*v1.Pod) |
| case cache.DeletedFinalStateUnknown: |
| var ok bool |
| pod, ok = t.Obj.(*v1.Pod) |
| if !ok { |
| runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c)) |
| return |
| } |
| default: |
| runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj)) |
| return |
| } |
| if err := c.podQueue.Delete(pod); err != nil { |
| runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err)) |
| } |
| if c.volumeBinder != nil { |
| // Volume binder only wants to keep unassigned pods |
| c.volumeBinder.DeletePodBindings(pod) |
| } |
| } |
| |
| func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) { |
| if c.enableEquivalenceClassCache { |
| // if the pod does not have bound node, updating equivalence cache is meaningless; |
| // if pod's bound node has been changed, that case should be handled by pod add & delete. |
| if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName { |
| if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) { |
| // MatchInterPodAffinity need to be reconsidered for this node, |
| // as well as all nodes in its same failure domain. |
| c.equivalencePodCache.InvalidatePredicates( |
| matchInterPodAffinitySet) |
| } |
| // if requested container resource changed, invalidate GeneralPredicates of this node |
| if !reflect.DeepEqual(predicates.GetResourceRequest(newPod), |
| predicates.GetResourceRequest(oldPod)) { |
| c.equivalencePodCache.InvalidatePredicatesOnNode( |
| newPod.Spec.NodeName, generalPredicatesSets) |
| } |
| } |
| } |
| } |
| |
| func (c *configFactory) deletePodFromCache(obj interface{}) { |
| var pod *v1.Pod |
| switch t := obj.(type) { |
| case *v1.Pod: |
| pod = t |
| case cache.DeletedFinalStateUnknown: |
| var ok bool |
| pod, ok = t.Obj.(*v1.Pod) |
| if !ok { |
| klog.Errorf("cannot convert to *v1.Pod: %v", t.Obj) |
| return |
| } |
| default: |
| klog.Errorf("cannot convert to *v1.Pod: %v", t) |
| return |
| } |
| // NOTE: Updates must be written to scheduler cache before invalidating |
| // equivalence cache, because we could snapshot equivalence cache after the |
| // invalidation and then snapshot the cache itself. If the cache is |
| // snapshotted before updates are written, we would update equivalence |
| // cache with stale information which is based on snapshot of old cache. |
| if err := c.schedulerCache.RemovePod(pod); err != nil { |
| klog.Errorf("scheduler cache RemovePod failed: %v", err) |
| } |
| |
| c.invalidateCachedPredicatesOnDeletePod(pod) |
| c.podQueue.MoveAllToActiveQueue() |
| } |
| |
| func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) { |
| if c.enableEquivalenceClassCache { |
| // part of this case is the same as pod add. |
| c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName) |
| // MatchInterPodAffinity need to be reconsidered for this node, |
| // as well as all nodes in its same failure domain. |
| // TODO(resouer) can we just do this for nodes in the same failure domain |
| c.equivalencePodCache.InvalidatePredicates( |
| matchInterPodAffinitySet) |
| |
| // if this pod have these PV, cached result of disk conflict will become invalid. |
| for _, volume := range pod.Spec.Volumes { |
| if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil || |
| volume.RBD != nil || volume.ISCSI != nil { |
| c.equivalencePodCache.InvalidatePredicatesOnNode( |
| pod.Spec.NodeName, noDiskConflictSet) |
| } |
| } |
| } |
| } |
| |
| func (c *configFactory) addNodeToCache(obj interface{}) { |
| node, ok := obj.(*v1.Node) |
| if !ok { |
| klog.Errorf("cannot convert to *v1.Node: %v", obj) |
| return |
| } |
| |
| // NOTE: Because the scheduler uses equivalence cache for nodes, we need |
| // to create it before adding node into scheduler cache. |
| if c.enableEquivalenceClassCache { |
| // GetNodeCache() will lazily create NodeCache for given node if it does not exist. |
| c.equivalencePodCache.GetNodeCache(node.GetName()) |
| } |
| |
| if err := c.schedulerCache.AddNode(node); err != nil { |
| klog.Errorf("scheduler cache AddNode failed: %v", err) |
| } |
| |
| c.podQueue.MoveAllToActiveQueue() |
| // NOTE: add a new node does not affect existing predicates in equivalence cache |
| } |
| |
| func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) { |
| oldNode, ok := oldObj.(*v1.Node) |
| if !ok { |
| klog.Errorf("cannot convert oldObj to *v1.Node: %v", oldObj) |
| return |
| } |
| newNode, ok := newObj.(*v1.Node) |
| if !ok { |
| klog.Errorf("cannot convert newObj to *v1.Node: %v", newObj) |
| return |
| } |
| |
| // NOTE: Updates must be written to scheduler cache before invalidating |
| // equivalence cache, because we could snapshot equivalence cache after the |
| // invalidation and then snapshot the cache itself. If the cache is |
| // snapshotted before updates are written, we would update equivalence |
| // cache with stale information which is based on snapshot of old cache. |
| if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil { |
| klog.Errorf("scheduler cache UpdateNode failed: %v", err) |
| } |
| |
| c.invalidateCachedPredicatesOnNodeUpdate(newNode, oldNode) |
| c.podQueue.MoveAllToActiveQueue() |
| } |
| |
| func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) { |
| if c.enableEquivalenceClassCache { |
| // Begin to update equivalence cache based on node update |
| // TODO(resouer): think about lazily initialize this set |
| invalidPredicates := sets.NewString() |
| |
| if !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) { |
| invalidPredicates.Insert(predicates.GeneralPred) // "PodFitsResources" |
| } |
| if !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) { |
| invalidPredicates.Insert(predicates.GeneralPred, predicates.CheckServiceAffinityPred) // "PodSelectorMatches" |
| for k, v := range oldNode.GetLabels() { |
| // any label can be topology key of pod, we have to invalidate in all cases |
| if v != newNode.GetLabels()[k] { |
| invalidPredicates.Insert(predicates.MatchInterPodAffinityPred) |
| } |
| // NoVolumeZoneConflict will only be affected by zone related label change |
| if isZoneRegionLabel(k) { |
| if v != newNode.GetLabels()[k] { |
| invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) |
| } |
| } |
| } |
| } |
| |
| oldTaints, oldErr := helper.GetTaintsFromNodeAnnotations(oldNode.GetAnnotations()) |
| if oldErr != nil { |
| klog.Errorf("Failed to get taints from old node annotation for equivalence cache") |
| } |
| newTaints, newErr := helper.GetTaintsFromNodeAnnotations(newNode.GetAnnotations()) |
| if newErr != nil { |
| klog.Errorf("Failed to get taints from new node annotation for equivalence cache") |
| } |
| if !reflect.DeepEqual(oldTaints, newTaints) || |
| !reflect.DeepEqual(oldNode.Spec.Taints, newNode.Spec.Taints) { |
| invalidPredicates.Insert(predicates.PodToleratesNodeTaintsPred) |
| } |
| |
| if !reflect.DeepEqual(oldNode.Status.Conditions, newNode.Status.Conditions) { |
| oldConditions := make(map[v1.NodeConditionType]v1.ConditionStatus) |
| newConditions := make(map[v1.NodeConditionType]v1.ConditionStatus) |
| for _, cond := range oldNode.Status.Conditions { |
| oldConditions[cond.Type] = cond.Status |
| } |
| for _, cond := range newNode.Status.Conditions { |
| newConditions[cond.Type] = cond.Status |
| } |
| if oldConditions[v1.NodeMemoryPressure] != newConditions[v1.NodeMemoryPressure] { |
| invalidPredicates.Insert(predicates.CheckNodeMemoryPressurePred) |
| } |
| if oldConditions[v1.NodeDiskPressure] != newConditions[v1.NodeDiskPressure] { |
| invalidPredicates.Insert(predicates.CheckNodeDiskPressurePred) |
| } |
| if oldConditions[v1.NodePIDPressure] != newConditions[v1.NodePIDPressure] { |
| invalidPredicates.Insert(predicates.CheckNodePIDPressurePred) |
| } |
| if oldConditions[v1.NodeReady] != newConditions[v1.NodeReady] || |
| oldConditions[v1.NodeOutOfDisk] != newConditions[v1.NodeOutOfDisk] || |
| oldConditions[v1.NodeNetworkUnavailable] != newConditions[v1.NodeNetworkUnavailable] { |
| invalidPredicates.Insert(predicates.CheckNodeConditionPred) |
| } |
| } |
| if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable { |
| invalidPredicates.Insert(predicates.CheckNodeConditionPred) |
| } |
| c.equivalencePodCache.InvalidatePredicatesOnNode(newNode.GetName(), invalidPredicates) |
| } |
| } |
| |
| func (c *configFactory) deleteNodeFromCache(obj interface{}) { |
| var node *v1.Node |
| switch t := obj.(type) { |
| case *v1.Node: |
| node = t |
| case cache.DeletedFinalStateUnknown: |
| var ok bool |
| node, ok = t.Obj.(*v1.Node) |
| if !ok { |
| klog.Errorf("cannot convert to *v1.Node: %v", t.Obj) |
| return |
| } |
| default: |
| klog.Errorf("cannot convert to *v1.Node: %v", t) |
| return |
| } |
| // NOTE: Updates must be written to scheduler cache before invalidating |
| // equivalence cache, because we could snapshot equivalence cache after the |
| // invalidation and then snapshot the cache itself. If the cache is |
| // snapshotted before updates are written, we would update equivalence |
| // cache with stale information which is based on snapshot of old cache. |
| if err := c.schedulerCache.RemoveNode(node); err != nil { |
| klog.Errorf("scheduler cache RemoveNode failed: %v", err) |
| } |
| if c.enableEquivalenceClassCache { |
| c.equivalencePodCache.InvalidateAllPredicatesOnNode(node.GetName()) |
| } |
| } |
| |
| // Create creates a scheduler with the default algorithm provider. |
| func (c *configFactory) Create() (*Config, error) { |
| return c.CreateFromProvider(DefaultProvider) |
| } |
| |
| // Creates a scheduler from the name of a registered algorithm provider. |
| func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) { |
| klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName) |
| provider, err := GetAlgorithmProvider(providerName) |
| if err != nil { |
| return nil, err |
| } |
| return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{}) |
| } |
| |
| // Creates a scheduler from the configuration file |
| func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) { |
| klog.V(2).Infof("Creating scheduler from configuration: %v", policy) |
| |
| // validate the policy configuration |
| if err := validation.ValidatePolicy(policy); err != nil { |
| return nil, err |
| } |
| |
| predicateKeys := sets.NewString() |
| if policy.Predicates == nil { |
| klog.V(2).Infof("Using predicates from algorithm provider '%v'", DefaultProvider) |
| provider, err := GetAlgorithmProvider(DefaultProvider) |
| if err != nil { |
| return nil, err |
| } |
| predicateKeys = provider.FitPredicateKeys |
| } else { |
| for _, predicate := range policy.Predicates { |
| klog.V(2).Infof("Registering predicate: %s", predicate.Name) |
| predicateKeys.Insert(RegisterCustomFitPredicate(predicate)) |
| } |
| } |
| |
| priorityKeys := sets.NewString() |
| if policy.Priorities == nil { |
| klog.V(2).Infof("Using priorities from algorithm provider '%v'", DefaultProvider) |
| provider, err := GetAlgorithmProvider(DefaultProvider) |
| if err != nil { |
| return nil, err |
| } |
| priorityKeys = provider.PriorityFunctionKeys |
| } else { |
| for _, priority := range policy.Priorities { |
| klog.V(2).Infof("Registering priority: %s", priority.Name) |
| priorityKeys.Insert(RegisterCustomPriorityFunction(priority)) |
| } |
| } |
| |
| var extenders []algorithm.SchedulerExtender |
| if len(policy.ExtenderConfigs) != 0 { |
| ignoredExtendedResources := sets.NewString() |
| for ii := range policy.ExtenderConfigs { |
| klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii]) |
| extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii]) |
| if err != nil { |
| return nil, err |
| } |
| extenders = append(extenders, extender) |
| for _, r := range policy.ExtenderConfigs[ii].ManagedResources { |
| if r.IgnoredByScheduler { |
| ignoredExtendedResources.Insert(string(r.Name)) |
| } |
| } |
| } |
| predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources) |
| } |
| // Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value. |
| // Give it higher precedence than scheduler CLI configuration when it is provided. |
| if policy.HardPodAffinitySymmetricWeight != 0 { |
| c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight |
| } |
| // When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured |
| // predicates even after one or more of them fails. |
| if policy.AlwaysCheckAllPredicates { |
| c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates |
| } |
| |
| return c.CreateFromKeys(predicateKeys, priorityKeys, extenders) |
| } |
| |
| // getBinderFunc returns an func which returns an extender that supports bind or a default binder based on the given pod. |
| func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder { |
| var extenderBinder algorithm.SchedulerExtender |
| for i := range extenders { |
| if extenders[i].IsBinder() { |
| extenderBinder = extenders[i] |
| break |
| } |
| } |
| defaultBinder := &binder{c.client} |
| return func(pod *v1.Pod) Binder { |
| if extenderBinder != nil && extenderBinder.IsInterested(pod) { |
| return extenderBinder |
| } |
| return defaultBinder |
| } |
| } |
| |
| // Creates a scheduler from a set of registered fit predicate keys and priority keys. |
| func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) { |
| klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys) |
| |
| if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 { |
| return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight()) |
| } |
| |
| predicateFuncs, err := c.GetPredicates(predicateKeys) |
| if err != nil { |
| return nil, err |
| } |
| |
| priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys) |
| if err != nil { |
| return nil, err |
| } |
| |
| priorityMetaProducer, err := c.GetPriorityMetadataProducer() |
| if err != nil { |
| return nil, err |
| } |
| |
| predicateMetaProducer, err := c.GetPredicateMetadataProducer() |
| if err != nil { |
| return nil, err |
| } |
| |
| // Init equivalence class cache |
| if c.enableEquivalenceClassCache { |
| c.equivalencePodCache = equivalence.NewCache(predicates.Ordering()) |
| klog.Info("Created equivalence class cache") |
| } |
| |
| algo := core.NewGenericScheduler( |
| c.schedulerCache, |
| c.equivalencePodCache, |
| c.podQueue, |
| predicateFuncs, |
| predicateMetaProducer, |
| priorityConfigs, |
| priorityMetaProducer, |
| extenders, |
| c.volumeBinder, |
| c.pVCLister, |
| c.pdbLister, |
| c.alwaysCheckAllPredicates, |
| c.disablePreemption, |
| c.percentageOfNodesToScore, |
| ) |
| |
| podBackoff := util.CreateDefaultPodBackoff() |
| return &Config{ |
| SchedulerCache: c.schedulerCache, |
| Ecache: c.equivalencePodCache, |
| // The scheduler only needs to consider schedulable nodes. |
| NodeLister: &nodeLister{c.nodeLister}, |
| Algorithm: algo, |
| GetBinder: c.getBinderFunc(extenders), |
| PodConditionUpdater: &podConditionUpdater{c.client}, |
| PodPreemptor: &podPreemptor{c.client}, |
| WaitForCacheSync: func() bool { |
| return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced) |
| }, |
| NextPod: func() *v1.Pod { |
| return c.getNextPod() |
| }, |
| Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue), |
| StopEverything: c.StopEverything, |
| VolumeBinder: c.volumeBinder, |
| SchedulingQueue: c.podQueue, |
| }, nil |
| } |
| |
| type nodeLister struct { |
| corelisters.NodeLister |
| } |
| |
| func (n *nodeLister) List() ([]*v1.Node, error) { |
| return n.NodeLister.List(labels.Everything()) |
| } |
| |
| func (c *configFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) { |
| pluginArgs, err := c.getPluginArgs() |
| if err != nil { |
| return nil, err |
| } |
| |
| return getPriorityFunctionConfigs(priorityKeys, *pluginArgs) |
| } |
| |
| func (c *configFactory) GetPriorityMetadataProducer() (algorithm.PriorityMetadataProducer, error) { |
| pluginArgs, err := c.getPluginArgs() |
| if err != nil { |
| return nil, err |
| } |
| |
| return getPriorityMetadataProducer(*pluginArgs) |
| } |
| |
| func (c *configFactory) GetPredicateMetadataProducer() (algorithm.PredicateMetadataProducer, error) { |
| pluginArgs, err := c.getPluginArgs() |
| if err != nil { |
| return nil, err |
| } |
| return getPredicateMetadataProducer(*pluginArgs) |
| } |
| |
| func (c *configFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) { |
| pluginArgs, err := c.getPluginArgs() |
| if err != nil { |
| return nil, err |
| } |
| |
| return getFitPredicateFunctions(predicateKeys, *pluginArgs) |
| } |
| |
| func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) { |
| return &PluginFactoryArgs{ |
| PodLister: c.podLister, |
| ServiceLister: c.serviceLister, |
| ControllerLister: c.controllerLister, |
| ReplicaSetLister: c.replicaSetLister, |
| StatefulSetLister: c.statefulSetLister, |
| NodeLister: &nodeLister{c.nodeLister}, |
| PDBLister: c.pdbLister, |
| NodeInfo: &predicates.CachedNodeInfo{NodeLister: c.nodeLister}, |
| PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister}, |
| PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister}, |
| StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: c.storageClassLister}, |
| VolumeBinder: c.volumeBinder, |
| HardPodAffinitySymmetricWeight: c.hardPodAffinitySymmetricWeight, |
| }, nil |
| } |
| |
| func (c *configFactory) getNextPod() *v1.Pod { |
| pod, err := c.podQueue.Pop() |
| if err == nil { |
| klog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name) |
| return pod |
| } |
| klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err) |
| return nil |
| } |
| |
| // assignedPod selects pods that are assigned (scheduled and running). |
| func assignedPod(pod *v1.Pod) bool { |
| return len(pod.Spec.NodeName) != 0 |
| } |
| |
| // responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler. |
| func responsibleForPod(pod *v1.Pod, schedulerName string) bool { |
| return schedulerName == pod.Spec.SchedulerName |
| } |
| |
| // assignedPodLister filters the pods returned from a PodLister to |
| // only include those that have a node name set. |
| type assignedPodLister struct { |
| corelisters.PodLister |
| } |
| |
| // List lists all Pods in the indexer for a given namespace. |
| func (l assignedPodLister) List(selector labels.Selector) ([]*v1.Pod, error) { |
| list, err := l.PodLister.List(selector) |
| if err != nil { |
| return nil, err |
| } |
| filtered := make([]*v1.Pod, 0, len(list)) |
| for _, pod := range list { |
| if len(pod.Spec.NodeName) > 0 { |
| filtered = append(filtered, pod) |
| } |
| } |
| return filtered, nil |
| } |
| |
| // List lists all Pods in the indexer for a given namespace. |
| func (l assignedPodLister) Pods(namespace string) corelisters.PodNamespaceLister { |
| return assignedPodNamespaceLister{l.PodLister.Pods(namespace)} |
| } |
| |
| // assignedPodNamespaceLister filters the pods returned from a PodNamespaceLister to |
| // only include those that have a node name set. |
| type assignedPodNamespaceLister struct { |
| corelisters.PodNamespaceLister |
| } |
| |
| // List lists all Pods in the indexer for a given namespace. |
| func (l assignedPodNamespaceLister) List(selector labels.Selector) (ret []*v1.Pod, err error) { |
| list, err := l.PodNamespaceLister.List(selector) |
| if err != nil { |
| return nil, err |
| } |
| filtered := make([]*v1.Pod, 0, len(list)) |
| for _, pod := range list { |
| if len(pod.Spec.NodeName) > 0 { |
| filtered = append(filtered, pod) |
| } |
| } |
| return filtered, nil |
| } |
| |
| // Get retrieves the Pod from the indexer for a given namespace and name. |
| func (l assignedPodNamespaceLister) Get(name string) (*v1.Pod, error) { |
| pod, err := l.PodNamespaceLister.Get(name) |
| if err != nil { |
| return nil, err |
| } |
| if len(pod.Spec.NodeName) > 0 { |
| return pod, nil |
| } |
| return nil, errors.NewNotFound(schema.GroupResource{Resource: string(v1.ResourcePods)}, name) |
| } |
| |
| type podInformer struct { |
| informer cache.SharedIndexInformer |
| } |
| |
| func (i *podInformer) Informer() cache.SharedIndexInformer { |
| return i.informer |
| } |
| |
| func (i *podInformer) Lister() corelisters.PodLister { |
| return corelisters.NewPodLister(i.informer.GetIndexer()) |
| } |
| |
| // NewPodInformer creates a shared index informer that returns only non-terminal pods. |
| func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) coreinformers.PodInformer { |
| selector := fields.ParseSelectorOrDie( |
| "status.phase!=" + string(v1.PodSucceeded) + |
| ",status.phase!=" + string(v1.PodFailed)) |
| lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), string(v1.ResourcePods), metav1.NamespaceAll, selector) |
| return &podInformer{ |
| informer: cache.NewSharedIndexInformer(lw, &v1.Pod{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), |
| } |
| } |
| |
| func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error) { |
| return func(pod *v1.Pod, err error) { |
| if err == core.ErrNoNodesAvailable { |
| klog.V(4).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name) |
| } else { |
| if _, ok := err.(*core.FitError); ok { |
| klog.V(4).Infof("Unable to schedule %v/%v: no fit: %v; waiting", pod.Namespace, pod.Name, err) |
| } else if errors.IsNotFound(err) { |
| if errStatus, ok := err.(errors.APIStatus); ok && errStatus.Status().Details.Kind == "node" { |
| nodeName := errStatus.Status().Details.Name |
| // when node is not found, We do not remove the node right away. Trying again to get |
| // the node and if the node is still not found, then remove it from the scheduler cache. |
| _, err := c.client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) |
| if err != nil && errors.IsNotFound(err) { |
| node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} |
| // NOTE: Updates must be written to scheduler cache before invalidating |
| // equivalence cache, because we could snapshot equivalence cache after the |
| // invalidation and then snapshot the cache itself. If the cache is |
| // snapshotted before updates are written, we would update equivalence |
| // cache with stale information which is based on snapshot of old cache. |
| c.schedulerCache.RemoveNode(&node) |
| // invalidate cached predicate for the node |
| if c.enableEquivalenceClassCache { |
| c.equivalencePodCache.InvalidateAllPredicatesOnNode(nodeName) |
| } |
| } |
| } |
| } else { |
| klog.Errorf("Error scheduling %v/%v: %v; retrying", pod.Namespace, pod.Name, err) |
| } |
| } |
| |
| backoff.Gc() |
| // Retry asynchronously. |
| // Note that this is extremely rudimentary and we need a more real error handling path. |
| go func() { |
| defer runtime.HandleCrash() |
| podID := types.NamespacedName{ |
| Namespace: pod.Namespace, |
| Name: pod.Name, |
| } |
| origPod := pod |
| |
| // When pod priority is enabled, we would like to place an unschedulable |
| // pod in the unschedulable queue. This ensures that if the pod is nominated |
| // to run on a node, scheduler takes the pod into account when running |
| // predicates for the node. |
| if !util.PodPriorityEnabled() { |
| entry := backoff.GetEntry(podID) |
| if !entry.TryWait(backoff.MaxDuration()) { |
| klog.Warningf("Request for pod %v already in flight, abandoning", podID) |
| return |
| } |
| } |
| // Get the pod again; it may have changed/been scheduled already. |
| getBackoff := initialGetBackoff |
| for { |
| pod, err := c.client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{}) |
| if err == nil { |
| if len(pod.Spec.NodeName) == 0 { |
| podQueue.AddUnschedulableIfNotPresent(pod) |
| } else { |
| if c.volumeBinder != nil { |
| // Volume binder only wants to keep unassigned pods |
| c.volumeBinder.DeletePodBindings(pod) |
| } |
| } |
| break |
| } |
| if errors.IsNotFound(err) { |
| klog.Warningf("A pod %v no longer exists", podID) |
| |
| if c.volumeBinder != nil { |
| // Volume binder only wants to keep unassigned pods |
| c.volumeBinder.DeletePodBindings(origPod) |
| } |
| return |
| } |
| klog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err) |
| if getBackoff = getBackoff * 2; getBackoff > maximalGetBackoff { |
| getBackoff = maximalGetBackoff |
| } |
| time.Sleep(getBackoff) |
| } |
| }() |
| } |
| } |
| |
| // nodeEnumerator allows a cache.Poller to enumerate items in an v1.NodeList |
| type nodeEnumerator struct { |
| *v1.NodeList |
| } |
| |
| // Len returns the number of items in the node list. |
| func (ne *nodeEnumerator) Len() int { |
| if ne.NodeList == nil { |
| return 0 |
| } |
| return len(ne.Items) |
| } |
| |
| // Get returns the item (and ID) with the particular index. |
| func (ne *nodeEnumerator) Get(index int) interface{} { |
| return &ne.Items[index] |
| } |
| |
| type binder struct { |
| Client clientset.Interface |
| } |
| |
| // Bind just does a POST binding RPC. |
| func (b *binder) Bind(binding *v1.Binding) error { |
| klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) |
| return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding) |
| } |
| |
| type podConditionUpdater struct { |
| Client clientset.Interface |
| } |
| |
| func (p *podConditionUpdater) Update(pod *v1.Pod, condition *v1.PodCondition) error { |
| klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status) |
| if podutil.UpdatePodCondition(&pod.Status, condition) { |
| _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) |
| return err |
| } |
| return nil |
| } |
| |
| type podPreemptor struct { |
| Client clientset.Interface |
| } |
| |
| func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) { |
| return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) |
| } |
| |
| func (p *podPreemptor) DeletePod(pod *v1.Pod) error { |
| return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{}) |
| } |
| |
| func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error { |
| podCopy := pod.DeepCopy() |
| podCopy.Status.NominatedNodeName = nominatedNodeName |
| _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy) |
| return err |
| } |
| |
| func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error { |
| if len(pod.Status.NominatedNodeName) == 0 { |
| return nil |
| } |
| return p.SetNominatedNodeName(pod, "") |
| } |