| /* |
| Copyright 2016 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package podautoscaler |
| |
| import ( |
| "fmt" |
| "math" |
| "time" |
| |
| autoscaling "k8s.io/api/autoscaling/v2beta2" |
| "k8s.io/api/core/v1" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| "k8s.io/apimachinery/pkg/labels" |
| "k8s.io/apimachinery/pkg/util/sets" |
| corelisters "k8s.io/client-go/listers/core/v1" |
| podutil "k8s.io/kubernetes/pkg/api/v1/pod" |
| metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" |
| ) |
| |
| const ( |
| // defaultTestingTolerance is default value for calculating when to |
| // scale up/scale down. |
| defaultTestingTolerance = 0.1 |
| defaultTestingCpuInitializationPeriod = 2 * time.Minute |
| defaultTestingDelayOfInitialReadinessStatus = 10 * time.Second |
| ) |
| |
| type ReplicaCalculator struct { |
| metricsClient metricsclient.MetricsClient |
| podLister corelisters.PodLister |
| tolerance float64 |
| cpuInitializationPeriod time.Duration |
| delayOfInitialReadinessStatus time.Duration |
| } |
| |
| func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podLister corelisters.PodLister, tolerance float64, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) *ReplicaCalculator { |
| return &ReplicaCalculator{ |
| metricsClient: metricsClient, |
| podLister: podLister, |
| tolerance: tolerance, |
| cpuInitializationPeriod: cpuInitializationPeriod, |
| delayOfInitialReadinessStatus: delayOfInitialReadinessStatus, |
| } |
| } |
| |
| // GetResourceReplicas calculates the desired replica count based on a target resource utilization percentage |
| // of the given resource for pods matching the given selector in the given namespace, and the current replica count |
| func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUtilization int32, resource v1.ResourceName, namespace string, selector labels.Selector) (replicaCount int32, utilization int32, rawUtilization int64, timestamp time.Time, err error) { |
| metrics, timestamp, err := c.metricsClient.GetResourceMetric(resource, namespace, selector) |
| if err != nil { |
| return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err) |
| } |
| podList, err := c.podLister.Pods(namespace).List(selector) |
| if err != nil { |
| return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get pods while calculating replica count: %v", err) |
| } |
| |
| itemsLen := len(podList) |
| if itemsLen == 0 { |
| return 0, 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count") |
| } |
| |
| readyPodCount, ignoredPods, missingPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus) |
| removeMetricsForPods(metrics, ignoredPods) |
| requests, err := calculatePodRequests(podList, resource) |
| if err != nil { |
| return 0, 0, 0, time.Time{}, err |
| } |
| |
| if len(metrics) == 0 { |
| return 0, 0, 0, time.Time{}, fmt.Errorf("did not receive metrics for any ready pods") |
| } |
| |
| usageRatio, utilization, rawUtilization, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization) |
| if err != nil { |
| return 0, 0, 0, time.Time{}, err |
| } |
| |
| rebalanceIgnored := len(ignoredPods) > 0 && usageRatio > 1.0 |
| if !rebalanceIgnored && len(missingPods) == 0 { |
| if math.Abs(1.0-usageRatio) <= c.tolerance { |
| // return the current replicas if the change would be too small |
| return currentReplicas, utilization, rawUtilization, timestamp, nil |
| } |
| |
| // if we don't have any unready or missing pods, we can calculate the new replica count now |
| return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, rawUtilization, timestamp, nil |
| } |
| |
| if len(missingPods) > 0 { |
| if usageRatio < 1.0 { |
| // on a scale-down, treat missing pods as using 100% of the resource request |
| for podName := range missingPods { |
| metrics[podName] = metricsclient.PodMetric{Value: requests[podName]} |
| } |
| } else if usageRatio > 1.0 { |
| // on a scale-up, treat missing pods as using 0% of the resource request |
| for podName := range missingPods { |
| metrics[podName] = metricsclient.PodMetric{Value: 0} |
| } |
| } |
| } |
| |
| if rebalanceIgnored { |
| // on a scale-up, treat unready pods as using 0% of the resource request |
| for podName := range ignoredPods { |
| metrics[podName] = metricsclient.PodMetric{Value: 0} |
| } |
| } |
| |
| // re-run the utilization calculation with our new numbers |
| newUsageRatio, _, _, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization) |
| if err != nil { |
| return 0, utilization, rawUtilization, time.Time{}, err |
| } |
| |
| if math.Abs(1.0-newUsageRatio) <= c.tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) { |
| // return the current replicas if the change would be too small, |
| // or if the new usage ratio would cause a change in scale direction |
| return currentReplicas, utilization, rawUtilization, timestamp, nil |
| } |
| |
| // return the result, where the number of replicas considered is |
| // however many replicas factored into our calculation |
| return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, rawUtilization, timestamp, nil |
| } |
| |
| // GetRawResourceReplicas calculates the desired replica count based on a target resource utilization (as a raw milli-value) |
| // for pods matching the given selector in the given namespace, and the current replica count |
| func (c *ReplicaCalculator) GetRawResourceReplicas(currentReplicas int32, targetUtilization int64, resource v1.ResourceName, namespace string, selector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) { |
| metrics, timestamp, err := c.metricsClient.GetResourceMetric(resource, namespace, selector) |
| if err != nil { |
| return 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err) |
| } |
| |
| replicaCount, utilization, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUtilization, namespace, selector, resource) |
| return replicaCount, utilization, timestamp, err |
| } |
| |
| // GetMetricReplicas calculates the desired replica count based on a target metric utilization |
| // (as a milli-value) for pods matching the given selector in the given namespace, and the |
| // current replica count |
| func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtilization int64, metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) { |
| metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector, metricSelector) |
| if err != nil { |
| return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err) |
| } |
| |
| replicaCount, utilization, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUtilization, namespace, selector, v1.ResourceName("")) |
| return replicaCount, utilization, timestamp, err |
| } |
| |
| // calcPlainMetricReplicas calculates the desired replicas for plain (i.e. non-utilization percentage) metrics. |
| func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) { |
| |
| podList, err := c.podLister.Pods(namespace).List(selector) |
| if err != nil { |
| return 0, 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err) |
| } |
| |
| if len(podList) == 0 { |
| return 0, 0, fmt.Errorf("no pods returned by selector while calculating replica count") |
| } |
| |
| readyPodCount, ignoredPods, missingPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus) |
| removeMetricsForPods(metrics, ignoredPods) |
| |
| if len(metrics) == 0 { |
| return 0, 0, fmt.Errorf("did not receive metrics for any ready pods") |
| } |
| |
| usageRatio, utilization := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization) |
| |
| rebalanceIgnored := len(ignoredPods) > 0 && usageRatio > 1.0 |
| |
| if !rebalanceIgnored && len(missingPods) == 0 { |
| if math.Abs(1.0-usageRatio) <= c.tolerance { |
| // return the current replicas if the change would be too small |
| return currentReplicas, utilization, nil |
| } |
| |
| // if we don't have any unready or missing pods, we can calculate the new replica count now |
| return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, nil |
| } |
| |
| if len(missingPods) > 0 { |
| if usageRatio < 1.0 { |
| // on a scale-down, treat missing pods as using 100% of the resource request |
| for podName := range missingPods { |
| metrics[podName] = metricsclient.PodMetric{Value: targetUtilization} |
| } |
| } else { |
| // on a scale-up, treat missing pods as using 0% of the resource request |
| for podName := range missingPods { |
| metrics[podName] = metricsclient.PodMetric{Value: 0} |
| } |
| } |
| } |
| |
| if rebalanceIgnored { |
| // on a scale-up, treat unready pods as using 0% of the resource request |
| for podName := range ignoredPods { |
| metrics[podName] = metricsclient.PodMetric{Value: 0} |
| } |
| } |
| |
| // re-run the utilization calculation with our new numbers |
| newUsageRatio, _ := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization) |
| |
| if math.Abs(1.0-newUsageRatio) <= c.tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) { |
| // return the current replicas if the change would be too small, |
| // or if the new usage ratio would cause a change in scale direction |
| return currentReplicas, utilization, nil |
| } |
| |
| // return the result, where the number of replicas considered is |
| // however many replicas factored into our calculation |
| return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, nil |
| } |
| |
| // GetObjectMetricReplicas calculates the desired replica count based on a target metric utilization (as a milli-value) |
| // for the given object in the given namespace, and the current replica count. |
| func (c *ReplicaCalculator) GetObjectMetricReplicas(currentReplicas int32, targetUtilization int64, metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) { |
| utilization, timestamp, err = c.metricsClient.GetObjectMetric(metricName, namespace, objectRef, metricSelector) |
| if err != nil { |
| return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v on %s %s/%s", metricName, objectRef.Kind, namespace, objectRef.Name, err) |
| } |
| |
| usageRatio := float64(utilization) / float64(targetUtilization) |
| if math.Abs(1.0-usageRatio) <= c.tolerance { |
| // return the current replicas if the change would be too small |
| return currentReplicas, utilization, timestamp, nil |
| } |
| |
| readyPodCount, err := c.getReadyPodsCount(namespace, selector) |
| |
| if err != nil { |
| return 0, 0, time.Time{}, fmt.Errorf("unable to calculate ready pods: %s", err) |
| } |
| |
| replicaCount = int32(math.Ceil(usageRatio * float64(readyPodCount))) |
| |
| return replicaCount, utilization, timestamp, nil |
| } |
| |
| // @TODO(mattjmcnaughton) Many different functions in this module use variations |
| // of this function. Make this function generic, so we don't repeat the same |
| // logic in multiple places. |
| func (c *ReplicaCalculator) getReadyPodsCount(namespace string, selector labels.Selector) (int64, error) { |
| podList, err := c.podLister.Pods(namespace).List(selector) |
| if err != nil { |
| return 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err) |
| } |
| |
| if len(podList) == 0 { |
| return 0, fmt.Errorf("no pods returned by selector while calculating replica count") |
| } |
| |
| readyPodCount := 0 |
| |
| for _, pod := range podList { |
| if pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod) { |
| readyPodCount++ |
| } |
| } |
| |
| return int64(readyPodCount), nil |
| } |
| |
| // GetExternalMetricReplicas calculates the desired replica count based on a |
| // target metric value (as a milli-value) for the external metric in the given |
| // namespace, and the current replica count. |
| func (c *ReplicaCalculator) GetExternalMetricReplicas(currentReplicas int32, targetUtilization int64, metricName, namespace string, metricSelector *metav1.LabelSelector, podSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) { |
| metricLabelSelector, err := metav1.LabelSelectorAsSelector(metricSelector) |
| if err != nil { |
| return 0, 0, time.Time{}, err |
| } |
| metrics, timestamp, err := c.metricsClient.GetExternalMetric(metricName, namespace, metricLabelSelector) |
| if err != nil { |
| return 0, 0, time.Time{}, fmt.Errorf("unable to get external metric %s/%s/%+v: %s", namespace, metricName, metricSelector, err) |
| } |
| utilization = 0 |
| for _, val := range metrics { |
| utilization = utilization + val |
| } |
| |
| readyPodCount, err := c.getReadyPodsCount(namespace, podSelector) |
| |
| if err != nil { |
| return 0, 0, time.Time{}, fmt.Errorf("unable to calculate ready pods: %s", err) |
| } |
| |
| usageRatio := float64(utilization) / float64(targetUtilization) |
| if math.Abs(1.0-usageRatio) <= c.tolerance { |
| // return the current replicas if the change would be too small |
| return currentReplicas, utilization, timestamp, nil |
| } |
| |
| return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, timestamp, nil |
| } |
| |
| // GetExternalPerPodMetricReplicas calculates the desired replica count based on a |
| // target metric value per pod (as a milli-value) for the external metric in the |
| // given namespace, and the current replica count. |
| func (c *ReplicaCalculator) GetExternalPerPodMetricReplicas(currentReplicas int32, targetUtilizationPerPod int64, metricName, namespace string, metricSelector *metav1.LabelSelector) (replicaCount int32, utilization int64, timestamp time.Time, err error) { |
| metricLabelSelector, err := metav1.LabelSelectorAsSelector(metricSelector) |
| if err != nil { |
| return 0, 0, time.Time{}, err |
| } |
| metrics, timestamp, err := c.metricsClient.GetExternalMetric(metricName, namespace, metricLabelSelector) |
| if err != nil { |
| return 0, 0, time.Time{}, fmt.Errorf("unable to get external metric %s/%s/%+v: %s", namespace, metricName, metricSelector, err) |
| } |
| utilization = 0 |
| for _, val := range metrics { |
| utilization = utilization + val |
| } |
| |
| replicaCount = currentReplicas |
| usageRatio := float64(utilization) / (float64(targetUtilizationPerPod) * float64(replicaCount)) |
| if math.Abs(1.0-usageRatio) > c.tolerance { |
| // update number of replicas if the change is large enough |
| replicaCount = int32(math.Ceil(float64(utilization) / float64(targetUtilizationPerPod))) |
| } |
| utilization = int64(math.Ceil(float64(utilization) / float64(currentReplicas))) |
| return replicaCount, utilization, timestamp, nil |
| } |
| |
| func groupPods(pods []*v1.Pod, metrics metricsclient.PodMetricsInfo, resource v1.ResourceName, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) (readyPodCount int, ignoredPods sets.String, missingPods sets.String) { |
| missingPods = sets.NewString() |
| ignoredPods = sets.NewString() |
| for _, pod := range pods { |
| if pod.DeletionTimestamp != nil || pod.Status.Phase == v1.PodFailed { |
| continue |
| } |
| metric, found := metrics[pod.Name] |
| if !found { |
| missingPods.Insert(pod.Name) |
| continue |
| } |
| if resource == v1.ResourceCPU { |
| var ignorePod bool |
| _, condition := podutil.GetPodCondition(&pod.Status, v1.PodReady) |
| if condition == nil || pod.Status.StartTime == nil { |
| ignorePod = true |
| } else { |
| // Pod still within possible initialisation period. |
| if pod.Status.StartTime.Add(cpuInitializationPeriod).After(time.Now()) { |
| // Ignore sample if pod is unready or one window of metric wasn't collected since last state transition. |
| ignorePod = condition.Status == v1.ConditionFalse || metric.Timestamp.Before(condition.LastTransitionTime.Time.Add(metric.Window)) |
| } else { |
| // Ignore metric if pod is unready and it has never been ready. |
| ignorePod = condition.Status == v1.ConditionFalse && pod.Status.StartTime.Add(delayOfInitialReadinessStatus).After(condition.LastTransitionTime.Time) |
| } |
| } |
| if ignorePod { |
| ignoredPods.Insert(pod.Name) |
| continue |
| } |
| } |
| readyPodCount++ |
| } |
| return |
| } |
| |
| func calculatePodRequests(pods []*v1.Pod, resource v1.ResourceName) (map[string]int64, error) { |
| requests := make(map[string]int64, len(pods)) |
| for _, pod := range pods { |
| podSum := int64(0) |
| for _, container := range pod.Spec.Containers { |
| if containerRequest, ok := container.Resources.Requests[resource]; ok { |
| podSum += containerRequest.MilliValue() |
| } else { |
| return nil, fmt.Errorf("missing request for %s", resource) |
| } |
| } |
| requests[pod.Name] = podSum |
| } |
| return requests, nil |
| } |
| |
| func removeMetricsForPods(metrics metricsclient.PodMetricsInfo, pods sets.String) { |
| for _, pod := range pods.UnsortedList() { |
| delete(metrics, pod) |
| } |
| } |