| /* |
| Copyright 2015 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package cache |
| |
| import ( |
| "fmt" |
| "sync" |
| "time" |
| |
| "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/labels" |
| "k8s.io/apimachinery/pkg/util/sets" |
| "k8s.io/apimachinery/pkg/util/wait" |
| utilfeature "k8s.io/apiserver/pkg/util/feature" |
| "k8s.io/kubernetes/pkg/features" |
| schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" |
| |
| "k8s.io/klog" |
| ) |
| |
| var ( |
| cleanAssumedPeriod = 1 * time.Second |
| ) |
| |
| // New returns a Cache implementation. |
| // It automatically starts a go routine that manages expiration of assumed pods. |
| // "ttl" is how long the assumed pod will get expired. |
| // "stop" is the channel that would close the background goroutine. |
| func New(ttl time.Duration, stop <-chan struct{}) Cache { |
| cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop) |
| cache.run() |
| return cache |
| } |
| |
| type schedulerCache struct { |
| stop <-chan struct{} |
| ttl time.Duration |
| period time.Duration |
| |
| // This mutex guards all fields within this cache struct. |
| mu sync.RWMutex |
| // a set of assumed pod keys. |
| // The key could further be used to get an entry in podStates. |
| assumedPods map[string]bool |
| // a map from pod key to podState. |
| podStates map[string]*podState |
| nodes map[string]*schedulercache.NodeInfo |
| nodeTree *NodeTree |
| // A map from image name to its imageState. |
| imageStates map[string]*imageState |
| } |
| |
| type podState struct { |
| pod *v1.Pod |
| // Used by assumedPod to determinate expiration. |
| deadline *time.Time |
| // Used to block cache from expiring assumedPod if binding still runs |
| bindingFinished bool |
| } |
| |
| type imageState struct { |
| // Size of the image |
| size int64 |
| // A set of node names for nodes having this image present |
| nodes sets.String |
| } |
| |
| // createImageStateSummary returns a summarizing snapshot of the given image's state. |
| func (cache *schedulerCache) createImageStateSummary(state *imageState) *schedulercache.ImageStateSummary { |
| return &schedulercache.ImageStateSummary{ |
| Size: state.size, |
| NumNodes: len(state.nodes), |
| } |
| } |
| |
| func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache { |
| return &schedulerCache{ |
| ttl: ttl, |
| period: period, |
| stop: stop, |
| |
| nodes: make(map[string]*schedulercache.NodeInfo), |
| nodeTree: newNodeTree(nil), |
| assumedPods: make(map[string]bool), |
| podStates: make(map[string]*podState), |
| imageStates: make(map[string]*imageState), |
| } |
| } |
| |
| // Snapshot takes a snapshot of the current schedulerinternalcache. The method has performance impact, |
| // and should be only used in non-critical path. |
| func (cache *schedulerCache) Snapshot() *Snapshot { |
| cache.mu.RLock() |
| defer cache.mu.RUnlock() |
| |
| nodes := make(map[string]*schedulercache.NodeInfo) |
| for k, v := range cache.nodes { |
| nodes[k] = v.Clone() |
| } |
| |
| assumedPods := make(map[string]bool) |
| for k, v := range cache.assumedPods { |
| assumedPods[k] = v |
| } |
| |
| return &Snapshot{ |
| Nodes: nodes, |
| AssumedPods: assumedPods, |
| } |
| } |
| |
| func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*schedulercache.NodeInfo) error { |
| cache.mu.Lock() |
| defer cache.mu.Unlock() |
| |
| for name, info := range cache.nodes { |
| if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil { |
| // Transient scheduler info is reset here. |
| info.TransientInfo.ResetTransientSchedulerInfo() |
| } |
| if current, ok := nodeNameToInfo[name]; !ok || current.GetGeneration() != info.GetGeneration() { |
| nodeNameToInfo[name] = info.Clone() |
| } |
| } |
| for name := range nodeNameToInfo { |
| if _, ok := cache.nodes[name]; !ok { |
| delete(nodeNameToInfo, name) |
| } |
| } |
| return nil |
| } |
| |
| func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) { |
| alwaysTrue := func(p *v1.Pod) bool { return true } |
| return cache.FilteredList(alwaysTrue, selector) |
| } |
| |
| func (cache *schedulerCache) FilteredList(podFilter PodFilter, selector labels.Selector) ([]*v1.Pod, error) { |
| cache.mu.RLock() |
| defer cache.mu.RUnlock() |
| // podFilter is expected to return true for most or all of the pods. We |
| // can avoid expensive array growth without wasting too much memory by |
| // pre-allocating capacity. |
| maxSize := 0 |
| for _, info := range cache.nodes { |
| maxSize += len(info.Pods()) |
| } |
| pods := make([]*v1.Pod, 0, maxSize) |
| for _, info := range cache.nodes { |
| for _, pod := range info.Pods() { |
| if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) { |
| pods = append(pods, pod) |
| } |
| } |
| } |
| return pods, nil |
| } |
| |
| func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { |
| key, err := schedulercache.GetPodKey(pod) |
| if err != nil { |
| return err |
| } |
| |
| cache.mu.Lock() |
| defer cache.mu.Unlock() |
| if _, ok := cache.podStates[key]; ok { |
| return fmt.Errorf("pod %v is in the cache, so can't be assumed", key) |
| } |
| |
| cache.addPod(pod) |
| ps := &podState{ |
| pod: pod, |
| } |
| cache.podStates[key] = ps |
| cache.assumedPods[key] = true |
| return nil |
| } |
| |
| func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error { |
| return cache.finishBinding(pod, time.Now()) |
| } |
| |
| // finishBinding exists to make tests determinitistic by injecting now as an argument |
| func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error { |
| key, err := schedulercache.GetPodKey(pod) |
| if err != nil { |
| return err |
| } |
| |
| cache.mu.RLock() |
| defer cache.mu.RUnlock() |
| |
| klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key) |
| currState, ok := cache.podStates[key] |
| if ok && cache.assumedPods[key] { |
| dl := now.Add(cache.ttl) |
| currState.bindingFinished = true |
| currState.deadline = &dl |
| } |
| return nil |
| } |
| |
| func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { |
| key, err := schedulercache.GetPodKey(pod) |
| if err != nil { |
| return err |
| } |
| |
| cache.mu.Lock() |
| defer cache.mu.Unlock() |
| |
| currState, ok := cache.podStates[key] |
| if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName { |
| return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName) |
| } |
| |
| switch { |
| // Only assumed pod can be forgotten. |
| case ok && cache.assumedPods[key]: |
| err := cache.removePod(pod) |
| if err != nil { |
| return err |
| } |
| delete(cache.assumedPods, key) |
| delete(cache.podStates, key) |
| default: |
| return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key) |
| } |
| return nil |
| } |
| |
| // Assumes that lock is already acquired. |
| func (cache *schedulerCache) addPod(pod *v1.Pod) { |
| n, ok := cache.nodes[pod.Spec.NodeName] |
| if !ok { |
| n = schedulercache.NewNodeInfo() |
| cache.nodes[pod.Spec.NodeName] = n |
| } |
| n.AddPod(pod) |
| } |
| |
| // Assumes that lock is already acquired. |
| func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { |
| if err := cache.removePod(oldPod); err != nil { |
| return err |
| } |
| cache.addPod(newPod) |
| return nil |
| } |
| |
| // Assumes that lock is already acquired. |
| func (cache *schedulerCache) removePod(pod *v1.Pod) error { |
| n := cache.nodes[pod.Spec.NodeName] |
| if err := n.RemovePod(pod); err != nil { |
| return err |
| } |
| if len(n.Pods()) == 0 && n.Node() == nil { |
| delete(cache.nodes, pod.Spec.NodeName) |
| } |
| return nil |
| } |
| |
| func (cache *schedulerCache) AddPod(pod *v1.Pod) error { |
| key, err := schedulercache.GetPodKey(pod) |
| if err != nil { |
| return err |
| } |
| |
| cache.mu.Lock() |
| defer cache.mu.Unlock() |
| |
| currState, ok := cache.podStates[key] |
| switch { |
| case ok && cache.assumedPods[key]: |
| if currState.pod.Spec.NodeName != pod.Spec.NodeName { |
| // The pod was added to a different node than it was assumed to. |
| klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName) |
| // Clean this up. |
| cache.removePod(currState.pod) |
| cache.addPod(pod) |
| } |
| delete(cache.assumedPods, key) |
| cache.podStates[key].deadline = nil |
| cache.podStates[key].pod = pod |
| case !ok: |
| // Pod was expired. We should add it back. |
| cache.addPod(pod) |
| ps := &podState{ |
| pod: pod, |
| } |
| cache.podStates[key] = ps |
| default: |
| return fmt.Errorf("pod %v was already in added state", key) |
| } |
| return nil |
| } |
| |
| func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error { |
| key, err := schedulercache.GetPodKey(oldPod) |
| if err != nil { |
| return err |
| } |
| |
| cache.mu.Lock() |
| defer cache.mu.Unlock() |
| |
| currState, ok := cache.podStates[key] |
| switch { |
| // An assumed pod won't have Update/Remove event. It needs to have Add event |
| // before Update event, in which case the state would change from Assumed to Added. |
| case ok && !cache.assumedPods[key]: |
| if currState.pod.Spec.NodeName != newPod.Spec.NodeName { |
| klog.Errorf("Pod %v updated on a different node than previously added to.", key) |
| klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions") |
| } |
| if err := cache.updatePod(oldPod, newPod); err != nil { |
| return err |
| } |
| currState.pod = newPod |
| default: |
| return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key) |
| } |
| return nil |
| } |
| |
| func (cache *schedulerCache) RemovePod(pod *v1.Pod) error { |
| key, err := schedulercache.GetPodKey(pod) |
| if err != nil { |
| return err |
| } |
| |
| cache.mu.Lock() |
| defer cache.mu.Unlock() |
| |
| currState, ok := cache.podStates[key] |
| switch { |
| // An assumed pod won't have Delete/Remove event. It needs to have Add event |
| // before Remove event, in which case the state would change from Assumed to Added. |
| case ok && !cache.assumedPods[key]: |
| if currState.pod.Spec.NodeName != pod.Spec.NodeName { |
| klog.Errorf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName) |
| klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions") |
| } |
| err := cache.removePod(currState.pod) |
| if err != nil { |
| return err |
| } |
| delete(cache.podStates, key) |
| default: |
| return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key) |
| } |
| return nil |
| } |
| |
| func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) { |
| key, err := schedulercache.GetPodKey(pod) |
| if err != nil { |
| return false, err |
| } |
| |
| cache.mu.RLock() |
| defer cache.mu.RUnlock() |
| |
| b, found := cache.assumedPods[key] |
| if !found { |
| return false, nil |
| } |
| return b, nil |
| } |
| |
| func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) { |
| key, err := schedulercache.GetPodKey(pod) |
| if err != nil { |
| return nil, err |
| } |
| |
| cache.mu.RLock() |
| defer cache.mu.RUnlock() |
| |
| podState, ok := cache.podStates[key] |
| if !ok { |
| return nil, fmt.Errorf("pod %v does not exist in scheduler cache", key) |
| } |
| |
| return podState.pod, nil |
| } |
| |
| func (cache *schedulerCache) AddNode(node *v1.Node) error { |
| cache.mu.Lock() |
| defer cache.mu.Unlock() |
| |
| n, ok := cache.nodes[node.Name] |
| if !ok { |
| n = schedulercache.NewNodeInfo() |
| cache.nodes[node.Name] = n |
| } else { |
| cache.removeNodeImageStates(n.Node()) |
| } |
| |
| cache.nodeTree.AddNode(node) |
| cache.addNodeImageStates(node, n) |
| return n.SetNode(node) |
| } |
| |
| func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { |
| cache.mu.Lock() |
| defer cache.mu.Unlock() |
| |
| n, ok := cache.nodes[newNode.Name] |
| if !ok { |
| n = schedulercache.NewNodeInfo() |
| cache.nodes[newNode.Name] = n |
| } else { |
| cache.removeNodeImageStates(n.Node()) |
| } |
| |
| cache.nodeTree.UpdateNode(oldNode, newNode) |
| cache.addNodeImageStates(newNode, n) |
| return n.SetNode(newNode) |
| } |
| |
| func (cache *schedulerCache) RemoveNode(node *v1.Node) error { |
| cache.mu.Lock() |
| defer cache.mu.Unlock() |
| |
| n := cache.nodes[node.Name] |
| if err := n.RemoveNode(node); err != nil { |
| return err |
| } |
| // We remove NodeInfo for this node only if there aren't any pods on this node. |
| // We can't do it unconditionally, because notifications about pods are delivered |
| // in a different watch, and thus can potentially be observed later, even though |
| // they happened before node removal. |
| if len(n.Pods()) == 0 && n.Node() == nil { |
| delete(cache.nodes, node.Name) |
| } |
| |
| cache.nodeTree.RemoveNode(node) |
| cache.removeNodeImageStates(node) |
| return nil |
| } |
| |
| // addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in |
| // scheduler cache. This function assumes the lock to scheduler cache has been acquired. |
| func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulercache.NodeInfo) { |
| newSum := make(map[string]*schedulercache.ImageStateSummary) |
| |
| for _, image := range node.Status.Images { |
| for _, name := range image.Names { |
| // update the entry in imageStates |
| state, ok := cache.imageStates[name] |
| if !ok { |
| state = &imageState{ |
| size: image.SizeBytes, |
| nodes: sets.NewString(node.Name), |
| } |
| cache.imageStates[name] = state |
| } else { |
| state.nodes.Insert(node.Name) |
| } |
| // create the imageStateSummary for this image |
| if _, ok := newSum[name]; !ok { |
| newSum[name] = cache.createImageStateSummary(state) |
| } |
| } |
| } |
| nodeInfo.SetImageStates(newSum) |
| } |
| |
| // removeNodeImageStates removes the given node record from image entries having the node |
| // in imageStates cache. After the removal, if any image becomes free, i.e., the image |
| // is no longer available on any node, the image entry will be removed from imageStates. |
| func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) { |
| if node == nil { |
| return |
| } |
| |
| for _, image := range node.Status.Images { |
| for _, name := range image.Names { |
| state, ok := cache.imageStates[name] |
| if ok { |
| state.nodes.Delete(node.Name) |
| if len(state.nodes) == 0 { |
| // Remove the unused image to make sure the length of |
| // imageStates represents the total number of different |
| // images on all nodes |
| delete(cache.imageStates, name) |
| } |
| } |
| } |
| } |
| } |
| |
| func (cache *schedulerCache) run() { |
| go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) |
| } |
| |
| func (cache *schedulerCache) cleanupExpiredAssumedPods() { |
| cache.cleanupAssumedPods(time.Now()) |
| } |
| |
| // cleanupAssumedPods exists for making test deterministic by taking time as input argument. |
| func (cache *schedulerCache) cleanupAssumedPods(now time.Time) { |
| cache.mu.Lock() |
| defer cache.mu.Unlock() |
| |
| // The size of assumedPods should be small |
| for key := range cache.assumedPods { |
| ps, ok := cache.podStates[key] |
| if !ok { |
| panic("Key found in assumed set but not in podStates. Potentially a logical error.") |
| } |
| if !ps.bindingFinished { |
| klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.", |
| ps.pod.Namespace, ps.pod.Name) |
| continue |
| } |
| if now.After(*ps.deadline) { |
| klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name) |
| if err := cache.expirePod(key, ps); err != nil { |
| klog.Errorf("ExpirePod failed for %s: %v", key, err) |
| } |
| } |
| } |
| } |
| |
| func (cache *schedulerCache) expirePod(key string, ps *podState) error { |
| if err := cache.removePod(ps.pod); err != nil { |
| return err |
| } |
| delete(cache.assumedPods, key) |
| delete(cache.podStates, key) |
| return nil |
| } |
| |
| func (cache *schedulerCache) NodeTree() *NodeTree { |
| return cache.nodeTree |
| } |