| /* |
| Copyright 2016 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package priorities |
| |
| import ( |
| "context" |
| "sync" |
| |
| "k8s.io/api/core/v1" |
| apierrors "k8s.io/apimachinery/pkg/api/errors" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/client-go/util/workqueue" |
| "k8s.io/kubernetes/pkg/scheduler/algorithm" |
| "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" |
| priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" |
| schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" |
| schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" |
| |
| "k8s.io/klog" |
| ) |
| |
| // InterPodAffinity contains information to calculate inter pod affinity. |
| type InterPodAffinity struct { |
| info predicates.NodeInfo |
| nodeLister algorithm.NodeLister |
| podLister algorithm.PodLister |
| hardPodAffinityWeight int32 |
| } |
| |
| // NewInterPodAffinityPriority creates an InterPodAffinity. |
| func NewInterPodAffinityPriority( |
| info predicates.NodeInfo, |
| nodeLister algorithm.NodeLister, |
| podLister algorithm.PodLister, |
| hardPodAffinityWeight int32) algorithm.PriorityFunction { |
| interPodAffinity := &InterPodAffinity{ |
| info: info, |
| nodeLister: nodeLister, |
| podLister: podLister, |
| hardPodAffinityWeight: hardPodAffinityWeight, |
| } |
| return interPodAffinity.CalculateInterPodAffinityPriority |
| } |
| |
| type podAffinityPriorityMap struct { |
| sync.Mutex |
| |
| // nodes contain all nodes that should be considered |
| nodes []*v1.Node |
| // counts store the mapping from node name to so-far computed score of |
| // the node. |
| counts map[string]float64 |
| // The first error that we faced. |
| firstError error |
| } |
| |
| func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap { |
| return &podAffinityPriorityMap{ |
| nodes: nodes, |
| counts: make(map[string]float64, len(nodes)), |
| } |
| } |
| |
| func (p *podAffinityPriorityMap) setError(err error) { |
| p.Lock() |
| defer p.Unlock() |
| if p.firstError == nil { |
| p.firstError = err |
| } |
| } |
| |
| func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight float64) { |
| namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(podDefiningAffinityTerm, term) |
| selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) |
| if err != nil { |
| p.setError(err) |
| return |
| } |
| match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, namespaces, selector) |
| if match { |
| func() { |
| p.Lock() |
| defer p.Unlock() |
| for _, node := range p.nodes { |
| if priorityutil.NodesHaveSameTopologyKey(node, fixedNode, term.TopologyKey) { |
| p.counts[node.Name] += weight |
| } |
| } |
| }() |
| } |
| } |
| |
| func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) { |
| for i := range terms { |
| term := &terms[i] |
| p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, float64(term.Weight*int32(multiplier))) |
| } |
| } |
| |
| // CalculateInterPodAffinityPriority compute a sum by iterating through the elements of weightedPodAffinityTerm and adding |
| // "weight" to the sum if the corresponding PodAffinityTerm is satisfied for |
| // that node; the node(s) with the highest sum are the most preferred. |
| // Symmetry need to be considered for preferredDuringSchedulingIgnoredDuringExecution from podAffinity & podAntiAffinity, |
| // symmetry need to be considered for hard requirements from podAffinity |
| func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) { |
| affinity := pod.Spec.Affinity |
| hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil |
| hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil |
| |
| allNodeNames := make([]string, 0, len(nodeNameToInfo)) |
| for name := range nodeNameToInfo { |
| allNodeNames = append(allNodeNames, name) |
| } |
| |
| // convert the topology key based weights to the node name based weights |
| var maxCount float64 |
| var minCount float64 |
| // priorityMap stores the mapping from node name to so-far computed score of |
| // the node. |
| pm := newPodAffinityPriorityMap(nodes) |
| |
| processPod := func(existingPod *v1.Pod) error { |
| existingPodNode, err := ipa.info.GetNodeInfo(existingPod.Spec.NodeName) |
| if err != nil { |
| if apierrors.IsNotFound(err) { |
| klog.Errorf("Node not found, %v", existingPod.Spec.NodeName) |
| return nil |
| } |
| return err |
| } |
| existingPodAffinity := existingPod.Spec.Affinity |
| existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil |
| existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil |
| |
| if hasAffinityConstraints { |
| // For every soft pod affinity term of <pod>, if <existingPod> matches the term, |
| // increment <pm.counts> for every node in the cluster with the same <term.TopologyKey> |
| // value as that of <existingPods>`s node by the term`s weight. |
| terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution |
| pm.processTerms(terms, pod, existingPod, existingPodNode, 1) |
| } |
| if hasAntiAffinityConstraints { |
| // For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term, |
| // decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey> |
| // value as that of <existingPod>`s node by the term`s weight. |
| terms := affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution |
| pm.processTerms(terms, pod, existingPod, existingPodNode, -1) |
| } |
| |
| if existingHasAffinityConstraints { |
| // For every hard pod affinity term of <existingPod>, if <pod> matches the term, |
| // increment <pm.counts> for every node in the cluster with the same <term.TopologyKey> |
| // value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight> |
| if ipa.hardPodAffinityWeight > 0 { |
| terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution |
| // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. |
| //if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { |
| // terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) |
| //} |
| for _, term := range terms { |
| pm.processTerm(&term, existingPod, pod, existingPodNode, float64(ipa.hardPodAffinityWeight)) |
| } |
| } |
| // For every soft pod affinity term of <existingPod>, if <pod> matches the term, |
| // increment <pm.counts> for every node in the cluster with the same <term.TopologyKey> |
| // value as that of <existingPod>'s node by the term's weight. |
| terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution |
| pm.processTerms(terms, existingPod, pod, existingPodNode, 1) |
| } |
| if existingHasAntiAffinityConstraints { |
| // For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term, |
| // decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey> |
| // value as that of <existingPod>'s node by the term's weight. |
| terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution |
| pm.processTerms(terms, existingPod, pod, existingPodNode, -1) |
| } |
| return nil |
| } |
| processNode := func(i int) { |
| nodeInfo := nodeNameToInfo[allNodeNames[i]] |
| if nodeInfo.Node() != nil { |
| if hasAffinityConstraints || hasAntiAffinityConstraints { |
| // We need to process all the nodes. |
| for _, existingPod := range nodeInfo.Pods() { |
| if err := processPod(existingPod); err != nil { |
| pm.setError(err) |
| } |
| } |
| } else { |
| // The pod doesn't have any constraints - we need to check only existing |
| // ones that have some. |
| for _, existingPod := range nodeInfo.PodsWithAffinity() { |
| if err := processPod(existingPod); err != nil { |
| pm.setError(err) |
| } |
| } |
| } |
| } |
| } |
| workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode) |
| if pm.firstError != nil { |
| return nil, pm.firstError |
| } |
| |
| for _, node := range nodes { |
| if pm.counts[node.Name] > maxCount { |
| maxCount = pm.counts[node.Name] |
| } |
| if pm.counts[node.Name] < minCount { |
| minCount = pm.counts[node.Name] |
| } |
| } |
| |
| // calculate final priority score for each node |
| result := make(schedulerapi.HostPriorityList, 0, len(nodes)) |
| for _, node := range nodes { |
| fScore := float64(0) |
| if (maxCount - minCount) > 0 { |
| fScore = float64(schedulerapi.MaxPriority) * ((pm.counts[node.Name] - minCount) / (maxCount - minCount)) |
| } |
| result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)}) |
| if klog.V(10) { |
| klog.Infof("%v -> %v: InterPodAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore)) |
| } |
| } |
| return result, nil |
| } |