blob: 8832cf503f650e07cde1613d6643ab318cae0e92 [file] [log] [blame]
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package autoscaling
import (
"encoding/json"
"fmt"
"math"
"strings"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/klog"
)
const (
memoryReservationTimeout = 5 * time.Minute
largeResizeTimeout = 8 * time.Minute
largeScaleUpTimeout = 10 * time.Minute
largeScaleDownTimeout = 20 * time.Minute
minute = 1 * time.Minute
maxNodes = 1000
)
type clusterPredicates struct {
nodes int
}
type scaleUpTestConfig struct {
initialNodes int
initialPods int
extraPods *testutils.RCConfig
expectedResult *clusterPredicates
}
var _ = framework.KubeDescribe("Cluster size autoscaler scalability [Slow]", func() {
f := framework.NewDefaultFramework("autoscaling")
var c clientset.Interface
var nodeCount int
var coresPerNode int
var memCapacityMb int
var originalSizes map[string]int
var sum int
BeforeEach(func() {
framework.SkipUnlessProviderIs("gce", "gke", "kubemark")
// Check if Cloud Autoscaler is enabled by trying to get its ConfigMap.
_, err := f.ClientSet.CoreV1().ConfigMaps("kube-system").Get("cluster-autoscaler-status", metav1.GetOptions{})
if err != nil {
framework.Skipf("test expects Cluster Autoscaler to be enabled")
}
c = f.ClientSet
if originalSizes == nil {
originalSizes = make(map[string]int)
sum = 0
for _, mig := range strings.Split(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") {
size, err := framework.GroupSize(mig)
framework.ExpectNoError(err)
By(fmt.Sprintf("Initial size of %s: %d", mig, size))
originalSizes[mig] = size
sum += size
}
}
framework.ExpectNoError(framework.WaitForReadyNodes(c, sum, scaleUpTimeout))
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
nodeCount = len(nodes.Items)
Expect(nodeCount).NotTo(BeZero())
cpu := nodes.Items[0].Status.Capacity[v1.ResourceCPU]
mem := nodes.Items[0].Status.Capacity[v1.ResourceMemory]
coresPerNode = int((&cpu).MilliValue() / 1000)
memCapacityMb = int((&mem).Value() / 1024 / 1024)
Expect(nodeCount).Should(Equal(sum))
if framework.ProviderIs("gke") {
val, err := isAutoscalerEnabled(3)
framework.ExpectNoError(err)
if !val {
err = enableAutoscaler("default-pool", 3, 5)
framework.ExpectNoError(err)
}
}
})
AfterEach(func() {
By(fmt.Sprintf("Restoring initial size of the cluster"))
setMigSizes(originalSizes)
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount, scaleDownTimeout))
nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
framework.ExpectNoError(err)
s := time.Now()
makeSchedulableLoop:
for start := time.Now(); time.Since(start) < makeSchedulableTimeout; time.Sleep(makeSchedulableDelay) {
for _, n := range nodes.Items {
err = makeNodeSchedulable(c, &n, true)
switch err.(type) {
case CriticalAddonsOnlyError:
continue makeSchedulableLoop
default:
framework.ExpectNoError(err)
}
}
break
}
klog.Infof("Made nodes schedulable again in %v", time.Since(s).String())
})
It("should scale up at all [Feature:ClusterAutoscalerScalability1]", func() {
perNodeReservation := int(float64(memCapacityMb) * 0.95)
replicasPerNode := 10
additionalNodes := maxNodes - nodeCount
replicas := additionalNodes * replicasPerNode
additionalReservation := additionalNodes * perNodeReservation
// saturate cluster
reservationCleanup := ReserveMemory(f, "some-pod", nodeCount*2, nodeCount*perNodeReservation, true, memoryReservationTimeout)
defer reservationCleanup()
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
// configure pending pods & expected scale up
rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas, additionalReservation, largeScaleUpTimeout)
expectedResult := createClusterPredicates(nodeCount + additionalNodes)
config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult)
// run test
testCleanup := simpleScaleUpTest(f, config)
defer testCleanup()
})
It("should scale up twice [Feature:ClusterAutoscalerScalability2]", func() {
perNodeReservation := int(float64(memCapacityMb) * 0.95)
replicasPerNode := 10
additionalNodes1 := int(math.Ceil(0.7 * maxNodes))
additionalNodes2 := int(math.Ceil(0.25 * maxNodes))
if additionalNodes1+additionalNodes2 > maxNodes {
additionalNodes2 = maxNodes - additionalNodes1
}
replicas1 := additionalNodes1 * replicasPerNode
replicas2 := additionalNodes2 * replicasPerNode
klog.Infof("cores per node: %v", coresPerNode)
// saturate cluster
initialReplicas := nodeCount
reservationCleanup := ReserveMemory(f, "some-pod", initialReplicas, nodeCount*perNodeReservation, true, memoryReservationTimeout)
defer reservationCleanup()
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
klog.Infof("Reserved successfully")
// configure pending pods & expected scale up #1
rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas1, additionalNodes1*perNodeReservation, largeScaleUpTimeout)
expectedResult := createClusterPredicates(nodeCount + additionalNodes1)
config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult)
// run test #1
tolerateUnreadyNodes := additionalNodes1 / 20
tolerateUnreadyPods := (initialReplicas + replicas1) / 20
testCleanup1 := simpleScaleUpTestWithTolerance(f, config, tolerateUnreadyNodes, tolerateUnreadyPods)
defer testCleanup1()
klog.Infof("Scaled up once")
// configure pending pods & expected scale up #2
rcConfig2 := reserveMemoryRCConfig(f, "extra-pod-2", replicas2, additionalNodes2*perNodeReservation, largeScaleUpTimeout)
expectedResult2 := createClusterPredicates(nodeCount + additionalNodes1 + additionalNodes2)
config2 := createScaleUpTestConfig(nodeCount+additionalNodes1, nodeCount+additionalNodes2, rcConfig2, expectedResult2)
// run test #2
tolerateUnreadyNodes = maxNodes / 20
tolerateUnreadyPods = (initialReplicas + replicas1 + replicas2) / 20
testCleanup2 := simpleScaleUpTestWithTolerance(f, config2, tolerateUnreadyNodes, tolerateUnreadyPods)
defer testCleanup2()
klog.Infof("Scaled up twice")
})
It("should scale down empty nodes [Feature:ClusterAutoscalerScalability3]", func() {
perNodeReservation := int(float64(memCapacityMb) * 0.7)
replicas := int(math.Ceil(maxNodes * 0.7))
totalNodes := maxNodes
// resize cluster to totalNodes
newSizes := map[string]int{
anyKey(originalSizes): totalNodes,
}
setMigSizes(newSizes)
framework.ExpectNoError(framework.WaitForReadyNodes(f.ClientSet, totalNodes, largeResizeTimeout))
// run replicas
rcConfig := reserveMemoryRCConfig(f, "some-pod", replicas, replicas*perNodeReservation, largeScaleUpTimeout)
expectedResult := createClusterPredicates(totalNodes)
config := createScaleUpTestConfig(totalNodes, totalNodes, rcConfig, expectedResult)
tolerateUnreadyNodes := totalNodes / 10
tolerateUnreadyPods := replicas / 10
testCleanup := simpleScaleUpTestWithTolerance(f, config, tolerateUnreadyNodes, tolerateUnreadyPods)
defer testCleanup()
// check if empty nodes are scaled down
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool {
return size <= replicas+3 // leaving space for non-evictable kube-system pods
}, scaleDownTimeout))
})
It("should scale down underutilized nodes [Feature:ClusterAutoscalerScalability4]", func() {
perPodReservation := int(float64(memCapacityMb) * 0.01)
// underutilizedNodes are 10% full
underutilizedPerNodeReplicas := 10
// fullNodes are 70% full
fullPerNodeReplicas := 70
totalNodes := maxNodes
underutilizedRatio := 0.3
maxDelta := 30
// resize cluster to totalNodes
newSizes := map[string]int{
anyKey(originalSizes): totalNodes,
}
setMigSizes(newSizes)
framework.ExpectNoError(framework.WaitForReadyNodes(f.ClientSet, totalNodes, largeResizeTimeout))
// annotate all nodes with no-scale-down
ScaleDownDisabledKey := "cluster-autoscaler.kubernetes.io/scale-down-disabled"
nodes, err := f.ClientSet.CoreV1().Nodes().List(metav1.ListOptions{
FieldSelector: fields.Set{
"spec.unschedulable": "false",
}.AsSelector().String(),
})
framework.ExpectNoError(err)
framework.ExpectNoError(addAnnotation(f, nodes.Items, ScaleDownDisabledKey, "true"))
// distribute pods using replication controllers taking up space that should
// be empty after pods are distributed
underutilizedNodesNum := int(float64(maxNodes) * underutilizedRatio)
fullNodesNum := totalNodes - underutilizedNodesNum
podDistribution := []podBatch{
{numNodes: fullNodesNum, podsPerNode: fullPerNodeReplicas},
{numNodes: underutilizedNodesNum, podsPerNode: underutilizedPerNodeReplicas}}
cleanup := distributeLoad(f, f.Namespace.Name, "10-70", podDistribution, perPodReservation,
int(0.95*float64(memCapacityMb)), map[string]string{}, largeScaleUpTimeout)
defer cleanup()
// enable scale down again
framework.ExpectNoError(addAnnotation(f, nodes.Items, ScaleDownDisabledKey, "false"))
// wait for scale down to start. Node deletion takes a long time, so we just
// wait for maximum of 30 nodes deleted
nodesToScaleDownCount := int(float64(totalNodes) * 0.1)
if nodesToScaleDownCount > maxDelta {
nodesToScaleDownCount = maxDelta
}
expectedSize := totalNodes - nodesToScaleDownCount
timeout := time.Duration(nodesToScaleDownCount)*time.Minute + scaleDownTimeout
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, func(size int) bool {
return size <= expectedSize
}, timeout))
})
It("shouldn't scale down with underutilized nodes due to host port conflicts [Feature:ClusterAutoscalerScalability5]", func() {
fullReservation := int(float64(memCapacityMb) * 0.9)
hostPortPodReservation := int(float64(memCapacityMb) * 0.3)
totalNodes := maxNodes
reservedPort := 4321
// resize cluster to totalNodes
newSizes := map[string]int{
anyKey(originalSizes): totalNodes,
}
setMigSizes(newSizes)
framework.ExpectNoError(framework.WaitForReadyNodes(f.ClientSet, totalNodes, largeResizeTimeout))
divider := int(float64(totalNodes) * 0.7)
fullNodesCount := divider
underutilizedNodesCount := totalNodes - fullNodesCount
By("Reserving full nodes")
// run RC1 w/o host port
cleanup := ReserveMemory(f, "filling-pod", fullNodesCount, fullNodesCount*fullReservation, true, largeScaleUpTimeout*2)
defer cleanup()
By("Reserving host ports on remaining nodes")
// run RC2 w/ host port
cleanup2 := createHostPortPodsWithMemory(f, "underutilizing-host-port-pod", underutilizedNodesCount, reservedPort, underutilizedNodesCount*hostPortPodReservation, largeScaleUpTimeout)
defer cleanup2()
waitForAllCaPodsReadyInNamespace(f, c)
// wait and check scale down doesn't occur
By(fmt.Sprintf("Sleeping %v minutes...", scaleDownTimeout.Minutes()))
time.Sleep(scaleDownTimeout)
By("Checking if the number of nodes is as expected")
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
klog.Infof("Nodes: %v, expected: %v", len(nodes.Items), totalNodes)
Expect(len(nodes.Items)).Should(Equal(totalNodes))
})
Specify("CA ignores unschedulable pods while scheduling schedulable pods [Feature:ClusterAutoscalerScalability6]", func() {
// Start a number of pods saturating existing nodes.
perNodeReservation := int(float64(memCapacityMb) * 0.80)
replicasPerNode := 10
initialPodReplicas := nodeCount * replicasPerNode
initialPodsTotalMemory := nodeCount * perNodeReservation
reservationCleanup := ReserveMemory(f, "initial-pod", initialPodReplicas, initialPodsTotalMemory, true /* wait for pods to run */, memoryReservationTimeout)
defer reservationCleanup()
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
// Configure a number of unschedulable pods.
unschedulableMemReservation := memCapacityMb * 2
unschedulablePodReplicas := 1000
totalMemReservation := unschedulableMemReservation * unschedulablePodReplicas
timeToWait := 5 * time.Minute
podsConfig := reserveMemoryRCConfig(f, "unschedulable-pod", unschedulablePodReplicas, totalMemReservation, timeToWait)
framework.RunRC(*podsConfig) // Ignore error (it will occur because pods are unschedulable)
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, podsConfig.Name)
// Ensure that no new nodes have been added so far.
Expect(framework.NumberOfReadyNodes(f.ClientSet)).To(Equal(nodeCount))
// Start a number of schedulable pods to ensure CA reacts.
additionalNodes := maxNodes - nodeCount
replicas := additionalNodes * replicasPerNode
totalMemory := additionalNodes * perNodeReservation
rcConfig := reserveMemoryRCConfig(f, "extra-pod", replicas, totalMemory, largeScaleUpTimeout)
expectedResult := createClusterPredicates(nodeCount + additionalNodes)
config := createScaleUpTestConfig(nodeCount, initialPodReplicas, rcConfig, expectedResult)
// Test that scale up happens, allowing 1000 unschedulable pods not to be scheduled.
testCleanup := simpleScaleUpTestWithTolerance(f, config, 0, unschedulablePodReplicas)
defer testCleanup()
})
})
func anyKey(input map[string]int) string {
for k := range input {
return k
}
return ""
}
func simpleScaleUpTestWithTolerance(f *framework.Framework, config *scaleUpTestConfig, tolerateMissingNodeCount int, tolerateMissingPodCount int) func() error {
// resize cluster to start size
// run rc based on config
By(fmt.Sprintf("Running RC %v from config", config.extraPods.Name))
start := time.Now()
framework.ExpectNoError(framework.RunRC(*config.extraPods))
// check results
if tolerateMissingNodeCount > 0 {
// Tolerate some number of nodes not to be created.
minExpectedNodeCount := config.expectedResult.nodes - tolerateMissingNodeCount
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size >= minExpectedNodeCount }, scaleUpTimeout))
} else {
framework.ExpectNoError(framework.WaitForReadyNodes(f.ClientSet, config.expectedResult.nodes, scaleUpTimeout))
}
klog.Infof("cluster is increased")
if tolerateMissingPodCount > 0 {
framework.ExpectNoError(waitForCaPodsReadyInNamespace(f, f.ClientSet, tolerateMissingPodCount))
} else {
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
}
timeTrack(start, fmt.Sprintf("Scale up to %v", config.expectedResult.nodes))
return func() error {
return framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, config.extraPods.Name)
}
}
func simpleScaleUpTest(f *framework.Framework, config *scaleUpTestConfig) func() error {
return simpleScaleUpTestWithTolerance(f, config, 0, 0)
}
func reserveMemoryRCConfig(f *framework.Framework, id string, replicas, megabytes int, timeout time.Duration) *testutils.RCConfig {
return &testutils.RCConfig{
Client: f.ClientSet,
InternalClient: f.InternalClientset,
Name: id,
Namespace: f.Namespace.Name,
Timeout: timeout,
Image: imageutils.GetPauseImageName(),
Replicas: replicas,
MemRequest: int64(1024 * 1024 * megabytes / replicas),
}
}
func createScaleUpTestConfig(nodes, pods int, extraPods *testutils.RCConfig, expectedResult *clusterPredicates) *scaleUpTestConfig {
return &scaleUpTestConfig{
initialNodes: nodes,
initialPods: pods,
extraPods: extraPods,
expectedResult: expectedResult,
}
}
func createClusterPredicates(nodes int) *clusterPredicates {
return &clusterPredicates{
nodes: nodes,
}
}
func addAnnotation(f *framework.Framework, nodes []v1.Node, key, value string) error {
for _, node := range nodes {
oldData, err := json.Marshal(node)
if err != nil {
return err
}
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
node.Annotations[key] = value
newData, err := json.Marshal(node)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
if err != nil {
return err
}
_, err = f.ClientSet.CoreV1().Nodes().Patch(string(node.Name), types.StrategicMergePatchType, patchBytes)
if err != nil {
return err
}
}
return nil
}
func createHostPortPodsWithMemory(f *framework.Framework, id string, replicas, port, megabytes int, timeout time.Duration) func() error {
By(fmt.Sprintf("Running RC which reserves host port and memory"))
request := int64(1024 * 1024 * megabytes / replicas)
config := &testutils.RCConfig{
Client: f.ClientSet,
InternalClient: f.InternalClientset,
Name: id,
Namespace: f.Namespace.Name,
Timeout: timeout,
Image: imageutils.GetPauseImageName(),
Replicas: replicas,
HostPorts: map[string]int{"port1": port},
MemRequest: request,
}
err := framework.RunRC(*config)
framework.ExpectNoError(err)
return func() error {
return framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, id)
}
}
type podBatch struct {
numNodes int
podsPerNode int
}
// distributeLoad distributes the pods in the way described by podDostribution,
// assuming all pods will have the same memory reservation and all nodes the same
// memory capacity. This allows us generate the load on the cluster in the exact
// way that we want.
//
// To achieve this we do the following:
// 1. Create replication controllers that eat up all the space that should be
// empty after setup, making sure they end up on different nodes by specifying
// conflicting host port
// 2. Create targer RC that will generate the load on the cluster
// 3. Remove the rcs created in 1.
func distributeLoad(f *framework.Framework, namespace string, id string, podDistribution []podBatch,
podMemRequestMegabytes int, nodeMemCapacity int, labels map[string]string, timeout time.Duration) func() error {
port := 8013
// Create load-distribution RCs with one pod per node, reserving all remaining
// memory to force the distribution of pods for the target RCs.
// The load-distribution RCs will be deleted on function return.
totalPods := 0
for i, podBatch := range podDistribution {
totalPods += podBatch.numNodes * podBatch.podsPerNode
remainingMem := nodeMemCapacity - podBatch.podsPerNode*podMemRequestMegabytes
replicas := podBatch.numNodes
cleanup := createHostPortPodsWithMemory(f, fmt.Sprintf("load-distribution%d", i), replicas, port, remainingMem*replicas, timeout)
defer cleanup()
}
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
// Create the target RC
rcConfig := reserveMemoryRCConfig(f, id, totalPods, totalPods*podMemRequestMegabytes, timeout)
framework.ExpectNoError(framework.RunRC(*rcConfig))
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
return func() error {
return framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, id)
}
}
func timeTrack(start time.Time, name string) {
elapsed := time.Since(start)
klog.Infof("%s took %s", name, elapsed)
}