blob: d776981999bd41ebcc5838c40464d0c7115181e2 [file] [log] [blame]
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package equivalence defines Pod equivalence classes and the equivalence class
// cache.
package equivalence
import (
"fmt"
"hash/fnv"
"sync"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/metrics"
hashutil "k8s.io/kubernetes/pkg/util/hash"
)
// nodeMap stores a *NodeCache for each node.
type nodeMap map[string]*NodeCache
// Cache is a thread safe map saves and reuses the output of predicate functions,
// it uses node name as key to access those cached results.
//
// Internally, results are keyed by predicate name, and "equivalence
// class". (Equivalence class is defined in the `Class` type.) Saved results
// will be reused until an appropriate invalidation function is called.
type Cache struct {
// NOTE(harry): Theoretically sync.Map has better performance in machine with 8+ CPUs, while
// the reality is lock contention in first level cache is rare.
mu sync.RWMutex
nodeToCache nodeMap
predicateIDMap map[string]int
}
// NewCache create an empty equiv class cache.
func NewCache(predicates []string) *Cache {
predicateIDMap := make(map[string]int, len(predicates))
for id, predicate := range predicates {
predicateIDMap[predicate] = id
}
return &Cache{
nodeToCache: make(nodeMap),
predicateIDMap: predicateIDMap,
}
}
// NodeCache saves and reuses the output of predicate functions. Use RunPredicate to
// get or update the cached results. An appropriate Invalidate* function should
// be called when some predicate results are no longer valid.
//
// Internally, results are keyed by predicate name, and "equivalence
// class". (Equivalence class is defined in the `Class` type.) Saved results
// will be reused until an appropriate invalidation function is called.
//
// NodeCache objects are thread safe within the context of NodeCache,
type NodeCache struct {
mu sync.RWMutex
cache predicateMap
// generation is current generation of node cache, incremented on node
// invalidation.
generation uint64
// snapshotGeneration saves snapshot of generation of node cache.
snapshotGeneration uint64
// predicateGenerations stores generation numbers for predicates, incremented on
// predicate invalidation. Created on first update. Use 0 if does not
// exist.
predicateGenerations []uint64
// snapshotPredicateGenerations saves snapshot of generation numbers for predicates.
snapshotPredicateGenerations []uint64
}
// newNodeCache returns an empty NodeCache.
func newNodeCache(n int) *NodeCache {
return &NodeCache{
cache: make(predicateMap, n),
predicateGenerations: make([]uint64, n),
snapshotPredicateGenerations: make([]uint64, n),
}
}
// Snapshot snapshots current generations of cache.
// NOTE: We snapshot generations of all node caches before using it and these
// operations are serialized, we can save snapshot as member of node cache
// itself.
func (c *Cache) Snapshot() {
c.mu.RLock()
defer c.mu.RUnlock()
for _, n := range c.nodeToCache {
n.mu.Lock()
// snapshot predicate generations
copy(n.snapshotPredicateGenerations, n.predicateGenerations)
// snapshot node generation
n.snapshotGeneration = n.generation
n.mu.Unlock()
}
return
}
// GetNodeCache returns the existing NodeCache for given node if present. Otherwise,
// it creates the NodeCache and returns it.
// The boolean flag is true if the value was loaded, false if created.
func (c *Cache) GetNodeCache(name string) (nodeCache *NodeCache, exists bool) {
c.mu.Lock()
defer c.mu.Unlock()
if nodeCache, exists = c.nodeToCache[name]; !exists {
nodeCache = newNodeCache(len(c.predicateIDMap))
c.nodeToCache[name] = nodeCache
}
return
}
// LoadNodeCache returns the existing NodeCache for given node, nil if not
// present.
func (c *Cache) LoadNodeCache(node string) *NodeCache {
c.mu.RLock()
defer c.mu.RUnlock()
return c.nodeToCache[node]
}
func (c *Cache) predicateKeysToIDs(predicateKeys sets.String) []int {
predicateIDs := make([]int, 0, len(predicateKeys))
for predicateKey := range predicateKeys {
if id, ok := c.predicateIDMap[predicateKey]; ok {
predicateIDs = append(predicateIDs, id)
} else {
klog.Errorf("predicate key %q not found", predicateKey)
}
}
return predicateIDs
}
// InvalidatePredicates clears all cached results for the given predicates.
func (c *Cache) InvalidatePredicates(predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
c.mu.RLock()
defer c.mu.RUnlock()
predicateIDs := c.predicateKeysToIDs(predicateKeys)
for _, n := range c.nodeToCache {
n.invalidatePreds(predicateIDs)
}
klog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys)
}
// InvalidatePredicatesOnNode clears cached results for the given predicates on one node.
func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
c.mu.RLock()
defer c.mu.RUnlock()
predicateIDs := c.predicateKeysToIDs(predicateKeys)
if n, ok := c.nodeToCache[nodeName]; ok {
n.invalidatePreds(predicateIDs)
}
klog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys)
}
// InvalidateAllPredicatesOnNode clears all cached results for one node.
func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) {
c.mu.RLock()
defer c.mu.RUnlock()
if node, ok := c.nodeToCache[nodeName]; ok {
node.invalidate()
}
klog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName)
}
// InvalidateCachedPredicateItemForPodAdd is a wrapper of
// InvalidateCachedPredicateItem for pod add case
// TODO: This does not belong with the equivalence cache implementation.
func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
// MatchInterPodAffinity: we assume scheduler can make sure newly bound pod
// will not break the existing inter pod affinity. So we does not need to
// invalidate MatchInterPodAffinity when pod added.
//
// But when a pod is deleted, existing inter pod affinity may become invalid.
// (e.g. this pod was preferred by some else, or vice versa)
//
// NOTE: assumptions above will not stand when we implemented features like
// RequiredDuringSchedulingRequiredDuringExecutioc.
// NoDiskConflict: the newly scheduled pod fits to existing pods on this node,
// it will also fits to equivalence class of existing pods
// GeneralPredicates: will always be affected by adding a new pod
invalidPredicates := sets.NewString(predicates.GeneralPred)
// MaxPDVolumeCountPredicate: we check the volumes of pod to make decisioc.
for _, vol := range pod.Spec.Volumes {
if vol.PersistentVolumeClaim != nil {
invalidPredicates.Insert(
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred)
}
} else {
// We do not consider CSI volumes here because CSI
// volumes can not be used inline.
if vol.AWSElasticBlockStore != nil {
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
}
if vol.GCEPersistentDisk != nil {
invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred)
}
if vol.AzureDisk != nil {
invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
}
}
}
c.InvalidatePredicatesOnNode(nodeName, invalidPredicates)
}
// Class represents a set of pods which are equivalent from the perspective of
// the scheduler. i.e. the scheduler would make the same decision for any pod
// from the same class.
type Class struct {
// Equivalence hash
hash uint64
}
// NewClass returns the equivalence class for a given Pod. The returned Class
// objects will be equal for two Pods in the same class. nil values should not
// be considered equal to each other.
//
// NOTE: Make sure to compare types of Class and not *Class.
// TODO(misterikkit): Return error instead of nil *Class.
func NewClass(pod *v1.Pod) *Class {
equivalencePod := getEquivalencePod(pod)
if equivalencePod != nil {
hash := fnv.New32a()
hashutil.DeepHashObject(hash, equivalencePod)
return &Class{
hash: uint64(hash.Sum32()),
}
}
return nil
}
// predicateMap stores resultMaps with predicate ID as the key.
type predicateMap []resultMap
// resultMap stores PredicateResult with pod equivalence hash as the key.
type resultMap map[uint64]predicateResult
// predicateResult stores the output of a FitPredicate.
type predicateResult struct {
Fit bool
FailReasons []algorithm.PredicateFailureReason
}
// RunPredicate returns a cached predicate result. In case of a cache miss, the predicate will be
// run and its results cached for the next call.
//
// NOTE: RunPredicate will not update the equivalence cache if generation does not match live version.
func (n *NodeCache) RunPredicate(
pred algorithm.FitPredicate,
predicateKey string,
predicateID int,
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
equivClass *Class,
) (bool, []algorithm.PredicateFailureReason, error) {
if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests.
return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid")
}
result, ok := n.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, predicateID, equivClass.hash)
if ok {
return result.Fit, result.FailReasons, nil
}
fit, reasons, err := pred(pod, meta, nodeInfo)
if err != nil {
return fit, reasons, err
}
n.updateResult(pod.GetName(), predicateKey, predicateID, fit, reasons, equivClass.hash, nodeInfo)
return fit, reasons, nil
}
// updateResult updates the cached result of a predicate.
func (n *NodeCache) updateResult(
podName, predicateKey string,
predicateID int,
fit bool,
reasons []algorithm.PredicateFailureReason,
equivalenceHash uint64,
nodeInfo *schedulercache.NodeInfo,
) {
if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests.
metrics.EquivalenceCacheWrites.WithLabelValues("discarded_bad_node").Inc()
return
}
predicateItem := predicateResult{
Fit: fit,
FailReasons: reasons,
}
n.mu.Lock()
defer n.mu.Unlock()
if (n.snapshotGeneration != n.generation) || (n.snapshotPredicateGenerations[predicateID] != n.predicateGenerations[predicateID]) {
// Generation of node or predicate has been updated since we last took
// a snapshot, this indicates that we received a invalidation request
// during this time. Cache may be stale, skip update.
metrics.EquivalenceCacheWrites.WithLabelValues("discarded_stale").Inc()
return
}
// If cached predicate map already exists, just update the predicate by key
if predicates := n.cache[predicateID]; predicates != nil {
// maps in golang are references, no need to add them back
predicates[equivalenceHash] = predicateItem
} else {
n.cache[predicateID] =
resultMap{
equivalenceHash: predicateItem,
}
}
n.predicateGenerations[predicateID]++
klog.V(5).Infof("Cache update: node=%s, predicate=%s,pod=%s,value=%v",
nodeInfo.Node().Name, predicateKey, podName, predicateItem)
}
// lookupResult returns cached predicate results and a bool saying whether a
// cache entry was found.
func (n *NodeCache) lookupResult(
podName, nodeName, predicateKey string,
predicateID int,
equivalenceHash uint64,
) (value predicateResult, ok bool) {
n.mu.RLock()
defer n.mu.RUnlock()
value, ok = n.cache[predicateID][equivalenceHash]
if ok {
metrics.EquivalenceCacheHits.Inc()
} else {
metrics.EquivalenceCacheMisses.Inc()
}
return value, ok
}
// invalidatePreds deletes cached predicates by given IDs.
func (n *NodeCache) invalidatePreds(predicateIDs []int) {
n.mu.Lock()
defer n.mu.Unlock()
for _, predicateID := range predicateIDs {
n.cache[predicateID] = nil
n.predicateGenerations[predicateID]++
}
}
// invalidate invalidates node cache.
func (n *NodeCache) invalidate() {
n.mu.Lock()
defer n.mu.Unlock()
n.cache = make(predicateMap, len(n.cache))
n.generation++
}
// equivalencePod is the set of pod attributes which must match for two pods to
// be considered equivalent for scheduling purposes. For correctness, this must
// include any Pod field which is used by a FitPredicate.
//
// NOTE: For equivalence hash to be formally correct, lists and maps in the
// equivalencePod should be normalized. (e.g. by sorting them) However, the vast
// majority of equivalent pod classes are expected to be created from a single
// pod template, so they will all have the same ordering.
type equivalencePod struct {
Namespace *string
Labels map[string]string
Affinity *v1.Affinity
Containers []v1.Container // See note about ordering
InitContainers []v1.Container // See note about ordering
NodeName *string
NodeSelector map[string]string
Tolerations []v1.Toleration
Volumes []v1.Volume // See note about ordering
}
// getEquivalencePod returns a normalized representation of a pod so that two
// "equivalent" pods will hash to the same value.
func getEquivalencePod(pod *v1.Pod) *equivalencePod {
ep := &equivalencePod{
Namespace: &pod.Namespace,
Labels: pod.Labels,
Affinity: pod.Spec.Affinity,
Containers: pod.Spec.Containers,
InitContainers: pod.Spec.InitContainers,
NodeName: &pod.Spec.NodeName,
NodeSelector: pod.Spec.NodeSelector,
Tolerations: pod.Spec.Tolerations,
Volumes: pod.Spec.Volumes,
}
// DeepHashObject considers nil and empty slices to be different. Normalize them.
if len(ep.Containers) == 0 {
ep.Containers = nil
}
if len(ep.InitContainers) == 0 {
ep.InitContainers = nil
}
if len(ep.Tolerations) == 0 {
ep.Tolerations = nil
}
if len(ep.Volumes) == 0 {
ep.Volumes = nil
}
// Normalize empty maps also.
if len(ep.Labels) == 0 {
ep.Labels = nil
}
if len(ep.NodeSelector) == 0 {
ep.NodeSelector = nil
}
// TODO(misterikkit): Also normalize nested maps and slices.
return ep
}