blob: c84d515a02bbb174a856f286cc1a77bbd8ce3d8d [file] [log] [blame]
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package autoscaling
import (
"bytes"
"fmt"
"io/ioutil"
"math"
"net/http"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
"k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
schedulerapi "k8s.io/api/scheduling/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/scheduling"
testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/klog"
)
const (
defaultTimeout = 3 * time.Minute
resizeTimeout = 5 * time.Minute
manualResizeTimeout = 6 * time.Minute
scaleUpTimeout = 5 * time.Minute
scaleUpTriggerTimeout = 2 * time.Minute
scaleDownTimeout = 20 * time.Minute
podTimeout = 2 * time.Minute
nodesRecoverTimeout = 5 * time.Minute
rcCreationRetryTimeout = 4 * time.Minute
rcCreationRetryDelay = 20 * time.Second
makeSchedulableTimeout = 10 * time.Minute
makeSchedulableDelay = 20 * time.Second
freshStatusLimit = 20 * time.Second
gkeEndpoint = "https://test-container.sandbox.googleapis.com"
gkeUpdateTimeout = 15 * time.Minute
gkeNodepoolNameKey = "cloud.google.com/gke-nodepool"
disabledTaint = "DisabledForAutoscalingTest"
criticalAddonsOnlyTaint = "CriticalAddonsOnly"
newNodesForScaledownTests = 2
unhealthyClusterThreshold = 4
caNoScaleUpStatus = "NoActivity"
caOngoingScaleUpStatus = "InProgress"
timestampFormat = "2006-01-02 15:04:05 -0700 MST"
expendablePriorityClassName = "expendable-priority"
highPriorityClassName = "high-priority"
gpuLabel = "cloud.google.com/gke-accelerator"
)
var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
f := framework.NewDefaultFramework("autoscaling")
var c clientset.Interface
var nodeCount int
var coreCount int64
var memAllocatableMb int
var originalSizes map[string]int
BeforeEach(func() {
c = f.ClientSet
framework.SkipUnlessProviderIs("gce", "gke")
originalSizes = make(map[string]int)
sum := 0
for _, mig := range strings.Split(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") {
size, err := framework.GroupSize(mig)
framework.ExpectNoError(err)
By(fmt.Sprintf("Initial size of %s: %d", mig, size))
originalSizes[mig] = size
sum += size
}
// Give instances time to spin up
framework.ExpectNoError(framework.WaitForReadyNodes(c, sum, scaleUpTimeout))
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
nodeCount = len(nodes.Items)
coreCount = 0
for _, node := range nodes.Items {
quantity := node.Status.Allocatable[v1.ResourceCPU]
coreCount += quantity.Value()
}
By(fmt.Sprintf("Initial number of schedulable nodes: %v", nodeCount))
Expect(nodeCount).NotTo(BeZero())
mem := nodes.Items[0].Status.Allocatable[v1.ResourceMemory]
memAllocatableMb = int((&mem).Value() / 1024 / 1024)
Expect(nodeCount).Should(Equal(sum))
if framework.ProviderIs("gke") {
val, err := isAutoscalerEnabled(5)
framework.ExpectNoError(err)
if !val {
err = enableAutoscaler("default-pool", 3, 5)
framework.ExpectNoError(err)
}
}
})
AfterEach(func() {
framework.SkipUnlessProviderIs("gce", "gke")
By(fmt.Sprintf("Restoring initial size of the cluster"))
setMigSizes(originalSizes)
expectedNodes := 0
for _, size := range originalSizes {
expectedNodes += size
}
framework.ExpectNoError(framework.WaitForReadyNodes(c, expectedNodes, scaleDownTimeout))
nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
framework.ExpectNoError(err)
s := time.Now()
makeSchedulableLoop:
for start := time.Now(); time.Since(start) < makeSchedulableTimeout; time.Sleep(makeSchedulableDelay) {
for _, n := range nodes.Items {
err = makeNodeSchedulable(c, &n, true)
switch err.(type) {
case CriticalAddonsOnlyError:
continue makeSchedulableLoop
default:
framework.ExpectNoError(err)
}
}
break
}
klog.Infof("Made nodes schedulable again in %v", time.Since(s).String())
})
It("shouldn't increase cluster size if pending pod is too large [Feature:ClusterSizeAutoscalingScaleUp]", func() {
By("Creating unschedulable pod")
ReserveMemory(f, "memory-reservation", 1, int(1.1*float64(memAllocatableMb)), false, defaultTimeout)
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
By("Waiting for scale up hoping it won't happen")
// Verify that the appropriate event was generated
eventFound := false
EventsLoop:
for start := time.Now(); time.Since(start) < scaleUpTimeout; time.Sleep(20 * time.Second) {
By("Waiting for NotTriggerScaleUp event")
events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(metav1.ListOptions{})
framework.ExpectNoError(err)
for _, e := range events.Items {
if e.InvolvedObject.Kind == "Pod" && e.Reason == "NotTriggerScaleUp" && strings.Contains(e.Message, "it wouldn't fit if a new node is added") {
By("NotTriggerScaleUp event found")
eventFound = true
break EventsLoop
}
}
}
Expect(eventFound).Should(Equal(true))
// Verify that cluster size is not changed
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size <= nodeCount }, time.Second))
})
simpleScaleUpTest := func(unready int) {
ReserveMemory(f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, 1*time.Second)
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
// Verify that cluster size is increased
framework.ExpectNoError(WaitForClusterSizeFuncWithUnready(f.ClientSet,
func(size int) bool { return size >= nodeCount+1 }, scaleUpTimeout, unready))
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
}
It("should increase cluster size if pending pods are small [Feature:ClusterSizeAutoscalingScaleUp]",
func() { simpleScaleUpTest(0) })
gpuType := os.Getenv("TESTED_GPU_TYPE")
It(fmt.Sprintf("Should scale up GPU pool from 0 [GpuType:%s] [Feature:ClusterSizeAutoscalingGpu]", gpuType), func() {
framework.SkipUnlessProviderIs("gke")
if gpuType == "" {
framework.Failf("TEST_GPU_TYPE not defined")
return
}
const gpuPoolName = "gpu-pool"
addGpuNodePool(gpuPoolName, gpuType, 1, 0)
defer deleteNodePool(gpuPoolName)
installNvidiaDriversDaemonSet()
By("Enable autoscaler")
framework.ExpectNoError(enableAutoscaler(gpuPoolName, 0, 1))
defer disableAutoscaler(gpuPoolName, 0, 1)
Expect(len(getPoolNodes(f, gpuPoolName))).Should(Equal(0))
By("Schedule a pod which requires GPU")
framework.ExpectNoError(ScheduleAnySingleGpuPod(f, "gpu-pod-rc"))
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "gpu-pod-rc")
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size == nodeCount+1 }, scaleUpTimeout))
Expect(len(getPoolNodes(f, gpuPoolName))).Should(Equal(1))
})
It(fmt.Sprintf("Should scale up GPU pool from 1 [GpuType:%s] [Feature:ClusterSizeAutoscalingGpu]", gpuType), func() {
framework.SkipUnlessProviderIs("gke")
if gpuType == "" {
framework.Failf("TEST_GPU_TYPE not defined")
return
}
const gpuPoolName = "gpu-pool"
addGpuNodePool(gpuPoolName, gpuType, 1, 1)
defer deleteNodePool(gpuPoolName)
installNvidiaDriversDaemonSet()
By("Schedule a single pod which requires GPU")
framework.ExpectNoError(ScheduleAnySingleGpuPod(f, "gpu-pod-rc"))
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "gpu-pod-rc")
By("Enable autoscaler")
framework.ExpectNoError(enableAutoscaler(gpuPoolName, 0, 2))
defer disableAutoscaler(gpuPoolName, 0, 2)
Expect(len(getPoolNodes(f, gpuPoolName))).Should(Equal(1))
By("Scale GPU deployment")
framework.ScaleRC(f.ClientSet, f.ScalesGetter, f.Namespace.Name, "gpu-pod-rc", 2, true)
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size == nodeCount+2 }, scaleUpTimeout))
Expect(len(getPoolNodes(f, gpuPoolName))).Should(Equal(2))
})
It(fmt.Sprintf("Should not scale GPU pool up if pod does not require GPUs [GpuType:%s] [Feature:ClusterSizeAutoscalingGpu]", gpuType), func() {
framework.SkipUnlessProviderIs("gke")
if gpuType == "" {
framework.Failf("TEST_GPU_TYPE not defined")
return
}
const gpuPoolName = "gpu-pool"
addGpuNodePool(gpuPoolName, gpuType, 1, 0)
defer deleteNodePool(gpuPoolName)
installNvidiaDriversDaemonSet()
By("Enable autoscaler")
framework.ExpectNoError(enableAutoscaler(gpuPoolName, 0, 1))
defer disableAutoscaler(gpuPoolName, 0, 1)
Expect(len(getPoolNodes(f, gpuPoolName))).Should(Equal(0))
By("Schedule bunch of pods beyond point of filling default pool but do not request any GPUs")
ReserveMemory(f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, 1*time.Second)
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
// Verify that cluster size is increased
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size >= nodeCount+1 }, scaleUpTimeout))
// Expect gpu pool to stay intact
Expect(len(getPoolNodes(f, gpuPoolName))).Should(Equal(0))
})
It(fmt.Sprintf("Should scale down GPU pool from 1 [GpuType:%s] [Feature:ClusterSizeAutoscalingGpu]", gpuType), func() {
framework.SkipUnlessProviderIs("gke")
if gpuType == "" {
framework.Failf("TEST_GPU_TYPE not defined")
return
}
const gpuPoolName = "gpu-pool"
addGpuNodePool(gpuPoolName, gpuType, 1, 1)
defer deleteNodePool(gpuPoolName)
installNvidiaDriversDaemonSet()
By("Schedule a single pod which requires GPU")
framework.ExpectNoError(ScheduleAnySingleGpuPod(f, "gpu-pod-rc"))
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "gpu-pod-rc")
By("Enable autoscaler")
framework.ExpectNoError(enableAutoscaler(gpuPoolName, 0, 1))
defer disableAutoscaler(gpuPoolName, 0, 1)
Expect(len(getPoolNodes(f, gpuPoolName))).Should(Equal(1))
By("Remove the only POD requiring GPU")
framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "gpu-pod-rc")
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size == nodeCount }, scaleDownTimeout))
Expect(len(getPoolNodes(f, gpuPoolName))).Should(Equal(0))
})
It("should increase cluster size if pending pods are small and one node is broken [Feature:ClusterSizeAutoscalingScaleUp]",
func() {
framework.TestUnderTemporaryNetworkFailure(c, "default", getAnyNode(c), func() { simpleScaleUpTest(1) })
})
It("shouldn't trigger additional scale-ups during processing scale-up [Feature:ClusterSizeAutoscalingScaleUp]", func() {
// Wait for the situation to stabilize - CA should be running and have up-to-date node readiness info.
status, err := waitForScaleUpStatus(c, func(s *scaleUpStatus) bool {
return s.ready == s.target && s.ready <= nodeCount
}, scaleUpTriggerTimeout)
framework.ExpectNoError(err)
unmanagedNodes := nodeCount - status.ready
By("Schedule more pods than can fit and wait for cluster to scale-up")
ReserveMemory(f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, 1*time.Second)
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
status, err = waitForScaleUpStatus(c, func(s *scaleUpStatus) bool {
return s.status == caOngoingScaleUpStatus
}, scaleUpTriggerTimeout)
framework.ExpectNoError(err)
target := status.target
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
By("Expect no more scale-up to be happening after all pods are scheduled")
status, err = getScaleUpStatus(c)
framework.ExpectNoError(err)
if status.target != target {
klog.Warningf("Final number of nodes (%v) does not match initial scale-up target (%v).", status.target, target)
}
Expect(status.timestamp.Add(freshStatusLimit).Before(time.Now())).Should(Equal(false))
Expect(status.status).Should(Equal(caNoScaleUpStatus))
Expect(status.ready).Should(Equal(status.target))
Expect(len(framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items)).Should(Equal(status.target + unmanagedNodes))
})
It("should increase cluster size if pending pods are small and there is another node pool that is not autoscaled [Feature:ClusterSizeAutoscalingScaleUp]", func() {
framework.SkipUnlessProviderIs("gke")
By("Creating new node-pool with n1-standard-4 machines")
const extraPoolName = "extra-pool"
addNodePool(extraPoolName, "n1-standard-4", 1)
defer deleteNodePool(extraPoolName)
extraNodes := getPoolInitialSize(extraPoolName)
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount+extraNodes, resizeTimeout))
// We wait for nodes to become schedulable to make sure the new nodes
// will be returned by getPoolNodes below.
framework.ExpectNoError(framework.WaitForAllNodesSchedulable(c, resizeTimeout))
klog.Infof("Not enabling cluster autoscaler for the node pool (on purpose).")
By("Getting memory available on new nodes, so we can account for it when creating RC")
nodes := getPoolNodes(f, extraPoolName)
Expect(len(nodes)).Should(Equal(extraNodes))
extraMemMb := 0
for _, node := range nodes {
mem := node.Status.Allocatable[v1.ResourceMemory]
extraMemMb += int((&mem).Value() / 1024 / 1024)
}
By("Reserving 0.1x more memory than the cluster holds to trigger scale up")
totalMemoryReservation := int(1.1 * float64(nodeCount*memAllocatableMb+extraMemMb))
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
ReserveMemory(f, "memory-reservation", 100, totalMemoryReservation, false, defaultTimeout)
// Verify, that cluster size is increased
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size >= nodeCount+extraNodes+1 }, scaleUpTimeout))
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
})
It("should disable node pool autoscaling [Feature:ClusterSizeAutoscalingScaleUp]", func() {
framework.SkipUnlessProviderIs("gke")
By("Creating new node-pool with n1-standard-4 machines")
const extraPoolName = "extra-pool"
addNodePool(extraPoolName, "n1-standard-4", 1)
defer deleteNodePool(extraPoolName)
extraNodes := getPoolInitialSize(extraPoolName)
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount+extraNodes, resizeTimeout))
framework.ExpectNoError(enableAutoscaler(extraPoolName, 1, 2))
framework.ExpectNoError(disableAutoscaler(extraPoolName, 1, 2))
})
It("should increase cluster size if pods are pending due to host port conflict [Feature:ClusterSizeAutoscalingScaleUp]", func() {
scheduling.CreateHostPortPods(f, "host-port", nodeCount+2, false)
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "host-port")
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size >= nodeCount+2 }, scaleUpTimeout))
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
})
It("should increase cluster size if pods are pending due to pod anti-affinity [Feature:ClusterSizeAutoscalingScaleUp]", func() {
pods := nodeCount
newPods := 2
labels := map[string]string{
"anti-affinity": "yes",
}
By("starting a pod with anti-affinity on each node")
framework.ExpectNoError(runAntiAffinityPods(f, f.Namespace.Name, pods, "some-pod", labels, labels))
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "some-pod")
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
By("scheduling extra pods with anti-affinity to existing ones")
framework.ExpectNoError(runAntiAffinityPods(f, f.Namespace.Name, newPods, "extra-pod", labels, labels))
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "extra-pod")
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount+newPods, scaleUpTimeout))
})
It("should increase cluster size if pod requesting EmptyDir volume is pending [Feature:ClusterSizeAutoscalingScaleUp]", func() {
By("creating pods")
pods := nodeCount
newPods := 1
labels := map[string]string{
"anti-affinity": "yes",
}
framework.ExpectNoError(runAntiAffinityPods(f, f.Namespace.Name, pods, "some-pod", labels, labels))
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "some-pod")
By("waiting for all pods before triggering scale up")
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
By("creating a pod requesting EmptyDir")
framework.ExpectNoError(runVolumeAntiAffinityPods(f, f.Namespace.Name, newPods, "extra-pod", labels, labels, emptyDirVolumes))
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "extra-pod")
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount+newPods, scaleUpTimeout))
})
It("should increase cluster size if pod requesting volume is pending [Feature:ClusterSizeAutoscalingScaleUp]", func() {
framework.SkipUnlessProviderIs("gce", "gke")
volumeLabels := labels.Set{
framework.VolumeSelectorKey: f.Namespace.Name,
}
selector := metav1.SetAsLabelSelector(volumeLabels)
By("creating volume & pvc")
diskName, err := framework.CreatePDWithRetry()
framework.ExpectNoError(err)
pvConfig := framework.PersistentVolumeConfig{
NamePrefix: "gce-",
Labels: volumeLabels,
PVSource: v1.PersistentVolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: diskName,
FSType: "ext3",
ReadOnly: false,
},
},
Prebind: nil,
}
emptyStorageClass := ""
pvcConfig := framework.PersistentVolumeClaimConfig{
Selector: selector,
StorageClassName: &emptyStorageClass,
}
pv, pvc, err := framework.CreatePVPVC(c, pvConfig, pvcConfig, f.Namespace.Name, false)
framework.ExpectNoError(err)
framework.ExpectNoError(framework.WaitOnPVandPVC(c, f.Namespace.Name, pv, pvc))
defer func() {
errs := framework.PVPVCCleanup(c, f.Namespace.Name, pv, pvc)
if len(errs) > 0 {
framework.Failf("failed to delete PVC and/or PV. Errors: %v", utilerrors.NewAggregate(errs))
}
pv, pvc = nil, nil
if diskName != "" {
framework.ExpectNoError(framework.DeletePDWithRetry(diskName))
}
}()
By("creating pods")
pods := nodeCount
labels := map[string]string{
"anti-affinity": "yes",
}
framework.ExpectNoError(runAntiAffinityPods(f, f.Namespace.Name, pods, "some-pod", labels, labels))
defer func() {
framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "some-pod")
klog.Infof("RC and pods not using volume deleted")
}()
By("waiting for all pods before triggering scale up")
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
By("creating a pod requesting PVC")
pvcPodName := "pvc-pod"
newPods := 1
volumes := buildVolumes(pv, pvc)
framework.ExpectNoError(runVolumeAntiAffinityPods(f, f.Namespace.Name, newPods, pvcPodName, labels, labels, volumes))
defer func() {
framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, pvcPodName)
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
}()
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount+newPods, scaleUpTimeout))
})
It("should add node to the particular mig [Feature:ClusterSizeAutoscalingScaleUp]", func() {
labelKey := "cluster-autoscaling-test.special-node"
labelValue := "true"
By("Finding the smallest MIG")
minMig := ""
minSize := nodeCount
for mig, size := range originalSizes {
if size <= minSize {
minMig = mig
minSize = size
}
}
if minSize == 0 {
newSizes := make(map[string]int)
for mig, size := range originalSizes {
newSizes[mig] = size
}
newSizes[minMig] = 1
setMigSizes(newSizes)
}
removeLabels := func(nodesToClean sets.String) {
By("Removing labels from nodes")
for node := range nodesToClean {
framework.RemoveLabelOffNode(c, node, labelKey)
}
}
nodes, err := framework.GetGroupNodes(minMig)
framework.ExpectNoError(err)
nodesSet := sets.NewString(nodes...)
defer removeLabels(nodesSet)
By(fmt.Sprintf("Annotating nodes of the smallest MIG(%s): %v", minMig, nodes))
for node := range nodesSet {
framework.AddOrUpdateLabelOnNode(c, node, labelKey, labelValue)
}
scheduling.CreateNodeSelectorPods(f, "node-selector", minSize+1, map[string]string{labelKey: labelValue}, false)
By("Waiting for new node to appear and annotating it")
framework.WaitForGroupSize(minMig, int32(minSize+1))
// Verify that cluster size is increased
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size >= nodeCount+1 }, scaleUpTimeout))
newNodes, err := framework.GetGroupNodes(minMig)
framework.ExpectNoError(err)
newNodesSet := sets.NewString(newNodes...)
newNodesSet.Delete(nodes...)
if len(newNodesSet) > 1 {
By(fmt.Sprintf("Spotted following new nodes in %s: %v", minMig, newNodesSet))
klog.Infof("Usually only 1 new node is expected, investigating")
klog.Infof("Kubectl:%s\n", framework.RunKubectlOrDie("get", "nodes", "-o", "json"))
if output, err := exec.Command("gcloud", "compute", "instances", "list",
"--project="+framework.TestContext.CloudConfig.ProjectID,
"--zone="+framework.TestContext.CloudConfig.Zone).Output(); err == nil {
klog.Infof("Gcloud compute instances list: %s", output)
} else {
klog.Errorf("Failed to get instances list: %v", err)
}
for newNode := range newNodesSet {
if output, err := execCmd("gcloud", "compute", "instances", "describe",
newNode,
"--project="+framework.TestContext.CloudConfig.ProjectID,
"--zone="+framework.TestContext.CloudConfig.Zone).Output(); err == nil {
klog.Infof("Gcloud compute instances describe: %s", output)
} else {
klog.Errorf("Failed to get instances describe: %v", err)
}
}
// TODO: possibly remove broken node from newNodesSet to prevent removeLabel from crashing.
// However at this moment we DO WANT it to crash so that we don't check all test runs for the
// rare behavior, but only the broken ones.
}
By(fmt.Sprintf("New nodes: %v\n", newNodesSet))
registeredNodes := sets.NewString()
for nodeName := range newNodesSet {
node, err := f.ClientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err == nil && node != nil {
registeredNodes.Insert(nodeName)
} else {
klog.Errorf("Failed to get node %v: %v", nodeName, err)
}
}
By(fmt.Sprintf("Setting labels for registered new nodes: %v", registeredNodes.List()))
for node := range registeredNodes {
framework.AddOrUpdateLabelOnNode(c, node, labelKey, labelValue)
}
defer removeLabels(registeredNodes)
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
framework.ExpectNoError(framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "node-selector"))
})
It("should scale up correct target pool [Feature:ClusterSizeAutoscalingScaleUp]", func() {
framework.SkipUnlessProviderIs("gke")
By("Creating new node-pool with n1-standard-4 machines")
const extraPoolName = "extra-pool"
addNodePool(extraPoolName, "n1-standard-4", 1)
defer deleteNodePool(extraPoolName)
extraNodes := getPoolInitialSize(extraPoolName)
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount+extraNodes, resizeTimeout))
framework.ExpectNoError(enableAutoscaler(extraPoolName, 1, 2))
defer disableAutoscaler(extraPoolName, 1, 2)
extraPods := extraNodes + 1
totalMemoryReservation := int(float64(extraPods) * 1.5 * float64(memAllocatableMb))
By(fmt.Sprintf("Creating rc with %v pods too big to fit default-pool but fitting extra-pool", extraPods))
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
ReserveMemory(f, "memory-reservation", extraPods, totalMemoryReservation, false, defaultTimeout)
// Apparently GKE master is restarted couple minutes after the node pool is added
// reseting all the timers in scale down code. Adding 5 extra minutes to workaround
// this issue.
// TODO: Remove the extra time when GKE restart is fixed.
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount+extraNodes+1, scaleUpTimeout+5*time.Minute))
})
simpleScaleDownTest := func(unready int) {
cleanup, err := addKubeSystemPdbs(f)
defer cleanup()
framework.ExpectNoError(err)
By("Manually increase cluster size")
increasedSize := 0
newSizes := make(map[string]int)
for key, val := range originalSizes {
newSizes[key] = val + 2 + unready
increasedSize += val + 2 + unready
}
setMigSizes(newSizes)
framework.ExpectNoError(WaitForClusterSizeFuncWithUnready(f.ClientSet,
func(size int) bool { return size >= increasedSize }, manualResizeTimeout, unready))
By("Some node should be removed")
framework.ExpectNoError(WaitForClusterSizeFuncWithUnready(f.ClientSet,
func(size int) bool { return size < increasedSize }, scaleDownTimeout, unready))
}
It("should correctly scale down after a node is not needed [Feature:ClusterSizeAutoscalingScaleDown]",
func() { simpleScaleDownTest(0) })
It("should correctly scale down after a node is not needed and one node is broken [Feature:ClusterSizeAutoscalingScaleDown]",
func() {
framework.TestUnderTemporaryNetworkFailure(c, "default", getAnyNode(c), func() { simpleScaleDownTest(1) })
})
It("should correctly scale down after a node is not needed when there is non autoscaled pool[Feature:ClusterSizeAutoscalingScaleDown]", func() {
framework.SkipUnlessProviderIs("gke")
increasedSize := manuallyIncreaseClusterSize(f, originalSizes)
const extraPoolName = "extra-pool"
addNodePool(extraPoolName, "n1-standard-1", 3)
defer deleteNodePool(extraPoolName)
extraNodes := getPoolInitialSize(extraPoolName)
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size >= increasedSize+extraNodes }, scaleUpTimeout))
By("Some node should be removed")
// Apparently GKE master is restarted couple minutes after the node pool is added
// reseting all the timers in scale down code. Adding 10 extra minutes to workaround
// this issue.
// TODO: Remove the extra time when GKE restart is fixed.
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size < increasedSize+extraNodes }, scaleDownTimeout+10*time.Minute))
})
It("should be able to scale down when rescheduling a pod is required and pdb allows for it[Feature:ClusterSizeAutoscalingScaleDown]", func() {
runDrainTest(f, originalSizes, f.Namespace.Name, 1, 1, func(increasedSize int) {
By("Some node should be removed")
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size < increasedSize }, scaleDownTimeout))
})
})
It("shouldn't be able to scale down when rescheduling a pod is required, but pdb doesn't allow drain[Feature:ClusterSizeAutoscalingScaleDown]", func() {
runDrainTest(f, originalSizes, f.Namespace.Name, 1, 0, func(increasedSize int) {
By("No nodes should be removed")
time.Sleep(scaleDownTimeout)
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
Expect(len(nodes.Items)).Should(Equal(increasedSize))
})
})
It("should be able to scale down by draining multiple pods one by one as dictated by pdb[Feature:ClusterSizeAutoscalingScaleDown]", func() {
runDrainTest(f, originalSizes, f.Namespace.Name, 2, 1, func(increasedSize int) {
By("Some node should be removed")
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size < increasedSize }, scaleDownTimeout))
})
})
It("should be able to scale down by draining system pods with pdb[Feature:ClusterSizeAutoscalingScaleDown]", func() {
runDrainTest(f, originalSizes, "kube-system", 2, 1, func(increasedSize int) {
By("Some node should be removed")
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size < increasedSize }, scaleDownTimeout))
})
})
It("Should be able to scale a node group up from 0[Feature:ClusterSizeAutoscalingScaleUp]", func() {
// Provider-specific setup
if framework.ProviderIs("gke") {
// GKE-specific setup
By("Add a new node pool with 0 nodes and min size 0")
const extraPoolName = "extra-pool"
addNodePool(extraPoolName, "n1-standard-4", 0)
defer deleteNodePool(extraPoolName)
framework.ExpectNoError(enableAutoscaler(extraPoolName, 0, 1))
defer disableAutoscaler(extraPoolName, 0, 1)
} else {
// on GCE, run only if there are already at least 2 node groups
framework.SkipUnlessAtLeast(len(originalSizes), 2, "At least 2 node groups are needed for scale-to-0 tests")
By("Manually scale smallest node group to 0")
minMig := ""
minSize := nodeCount
for mig, size := range originalSizes {
if size <= minSize {
minMig = mig
minSize = size
}
}
framework.ExpectNoError(framework.ResizeGroup(minMig, int32(0)))
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount-minSize, resizeTimeout))
}
By("Make remaining nodes unschedulable")
nodes, err := f.ClientSet.CoreV1().Nodes().List(metav1.ListOptions{FieldSelector: fields.Set{
"spec.unschedulable": "false",
}.AsSelector().String()})
framework.ExpectNoError(err)
for _, node := range nodes.Items {
err = makeNodeUnschedulable(f.ClientSet, &node)
defer func(n v1.Node) {
makeNodeSchedulable(f.ClientSet, &n, false)
}(node)
framework.ExpectNoError(err)
}
By("Run a scale-up test")
ReserveMemory(f, "memory-reservation", 1, 100, false, 1*time.Second)
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
// Verify that cluster size is increased
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size >= len(nodes.Items)+1 }, scaleUpTimeout))
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
})
// Scale to 0 test is split into two functions (for GKE & GCE.)
// The reason for it is that scenario is exactly the same,
// but setup & verification use different APIs.
//
// Scenario:
// (GKE only) add an extra node pool with size 1 & enable autoscaling for it
// (GCE only) find the smallest MIG & resize it to 1
// manually drain the single node from this node pool/MIG
// wait for cluster size to decrease
// verify the targeted node pool/MIG is of size 0
gkeScaleToZero := func() {
// GKE-specific setup
By("Add a new node pool with size 1 and min size 0")
const extraPoolName = "extra-pool"
addNodePool(extraPoolName, "n1-standard-4", 1)
defer deleteNodePool(extraPoolName)
extraNodes := getPoolInitialSize(extraPoolName)
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount+extraNodes, resizeTimeout))
framework.ExpectNoError(enableAutoscaler(extraPoolName, 0, 1))
defer disableAutoscaler(extraPoolName, 0, 1)
ngNodes := getPoolNodes(f, extraPoolName)
Expect(len(ngNodes)).To(Equal(extraNodes))
for _, node := range ngNodes {
By(fmt.Sprintf("Target node for scale-down: %s", node.Name))
}
for _, node := range ngNodes {
drainNode(f, node)
}
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size <= nodeCount }, scaleDownTimeout))
// GKE-specific check
newSize := getPoolSize(f, extraPoolName)
Expect(newSize).Should(Equal(0))
}
gceScaleToZero := func() {
// non-GKE only
By("Find smallest node group and manually scale it to a single node")
minMig := ""
minSize := nodeCount
for mig, size := range originalSizes {
if size <= minSize {
minMig = mig
minSize = size
}
}
framework.ExpectNoError(framework.ResizeGroup(minMig, int32(1)))
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount-minSize+1, resizeTimeout))
ngNodes, err := framework.GetGroupNodes(minMig)
framework.ExpectNoError(err)
Expect(len(ngNodes) == 1).To(BeTrue())
node, err := f.ClientSet.CoreV1().Nodes().Get(ngNodes[0], metav1.GetOptions{})
By(fmt.Sprintf("Target node for scale-down: %s", node.Name))
framework.ExpectNoError(err)
// this part is identical
drainNode(f, node)
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size < nodeCount-minSize+1 }, scaleDownTimeout))
// non-GKE only
newSize, err := framework.GroupSize(minMig)
framework.ExpectNoError(err)
Expect(newSize).Should(Equal(0))
}
It("Should be able to scale a node group down to 0[Feature:ClusterSizeAutoscalingScaleDown]", func() {
if framework.ProviderIs("gke") { // In GKE, we can just add a node pool
gkeScaleToZero()
} else if len(originalSizes) >= 2 {
gceScaleToZero()
} else {
framework.Skipf("At least 2 node groups are needed for scale-to-0 tests")
}
})
It("Shouldn't perform scale up operation and should list unhealthy status if most of the cluster is broken[Feature:ClusterSizeAutoscalingScaleUp]", func() {
clusterSize := nodeCount
for clusterSize < unhealthyClusterThreshold+1 {
clusterSize = manuallyIncreaseClusterSize(f, originalSizes)
}
By("Block network connectivity to some nodes to simulate unhealthy cluster")
nodesToBreakCount := int(math.Ceil(math.Max(float64(unhealthyClusterThreshold), 0.5*float64(clusterSize))))
nodes, err := f.ClientSet.CoreV1().Nodes().List(metav1.ListOptions{FieldSelector: fields.Set{
"spec.unschedulable": "false",
}.AsSelector().String()})
framework.ExpectNoError(err)
Expect(nodesToBreakCount <= len(nodes.Items)).To(BeTrue())
nodesToBreak := nodes.Items[:nodesToBreakCount]
// TestUnderTemporaryNetworkFailure only removes connectivity to a single node,
// and accepts func() callback. This is expanding the loop to recursive call
// to avoid duplicating TestUnderTemporaryNetworkFailure
var testFunction func()
testFunction = func() {
if len(nodesToBreak) > 0 {
ntb := &nodesToBreak[0]
nodesToBreak = nodesToBreak[1:]
framework.TestUnderTemporaryNetworkFailure(c, "default", ntb, testFunction)
} else {
ReserveMemory(f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, defaultTimeout)
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
time.Sleep(scaleUpTimeout)
currentNodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
framework.Logf("Currently available nodes: %v, nodes available at the start of test: %v, disabled nodes: %v", len(currentNodes.Items), len(nodes.Items), nodesToBreakCount)
Expect(len(currentNodes.Items)).Should(Equal(len(nodes.Items) - nodesToBreakCount))
status, err := getClusterwideStatus(c)
framework.Logf("Clusterwide status: %v", status)
framework.ExpectNoError(err)
Expect(status).Should(Equal("Unhealthy"))
}
}
testFunction()
// Give nodes time to recover from network failure
framework.ExpectNoError(framework.WaitForReadyNodes(c, len(nodes.Items), nodesRecoverTimeout))
})
It("shouldn't scale up when expendable pod is created [Feature:ClusterSizeAutoscalingScaleUp]", func() {
defer createPriorityClasses(f)()
// Create nodesCountAfterResize+1 pods allocating 0.7 allocatable on present nodes. One more node will have to be created.
cleanupFunc := ReserveMemoryWithPriority(f, "memory-reservation", nodeCount+1, int(float64(nodeCount+1)*float64(0.7)*float64(memAllocatableMb)), false, time.Second, expendablePriorityClassName)
defer cleanupFunc()
By(fmt.Sprintf("Waiting for scale up hoping it won't happen, sleep for %s", scaleUpTimeout.String()))
time.Sleep(scaleUpTimeout)
// Verify that cluster size is not changed
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size == nodeCount }, time.Second))
})
It("should scale up when non expendable pod is created [Feature:ClusterSizeAutoscalingScaleUp]", func() {
defer createPriorityClasses(f)()
// Create nodesCountAfterResize+1 pods allocating 0.7 allocatable on present nodes. One more node will have to be created.
cleanupFunc := ReserveMemoryWithPriority(f, "memory-reservation", nodeCount+1, int(float64(nodeCount+1)*float64(0.7)*float64(memAllocatableMb)), true, scaleUpTimeout, highPriorityClassName)
defer cleanupFunc()
// Verify that cluster size is not changed
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size > nodeCount }, time.Second))
})
It("shouldn't scale up when expendable pod is preempted [Feature:ClusterSizeAutoscalingScaleUp]", func() {
defer createPriorityClasses(f)()
// Create nodesCountAfterResize pods allocating 0.7 allocatable on present nodes - one pod per node.
cleanupFunc1 := ReserveMemoryWithPriority(f, "memory-reservation1", nodeCount, int(float64(nodeCount)*float64(0.7)*float64(memAllocatableMb)), true, defaultTimeout, expendablePriorityClassName)
defer cleanupFunc1()
// Create nodesCountAfterResize pods allocating 0.7 allocatable on present nodes - one pod per node. Pods created here should preempt pods created above.
cleanupFunc2 := ReserveMemoryWithPriority(f, "memory-reservation2", nodeCount, int(float64(nodeCount)*float64(0.7)*float64(memAllocatableMb)), true, defaultTimeout, highPriorityClassName)
defer cleanupFunc2()
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size == nodeCount }, time.Second))
})
It("should scale down when expendable pod is running [Feature:ClusterSizeAutoscalingScaleDown]", func() {
defer createPriorityClasses(f)()
increasedSize := manuallyIncreaseClusterSize(f, originalSizes)
// Create increasedSize pods allocating 0.7 allocatable on present nodes - one pod per node.
cleanupFunc := ReserveMemoryWithPriority(f, "memory-reservation", increasedSize, int(float64(increasedSize)*float64(0.7)*float64(memAllocatableMb)), true, scaleUpTimeout, expendablePriorityClassName)
defer cleanupFunc()
By("Waiting for scale down")
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size == nodeCount }, scaleDownTimeout))
})
It("shouldn't scale down when non expendable pod is running [Feature:ClusterSizeAutoscalingScaleDown]", func() {
defer createPriorityClasses(f)()
increasedSize := manuallyIncreaseClusterSize(f, originalSizes)
// Create increasedSize pods allocating 0.7 allocatable on present nodes - one pod per node.
cleanupFunc := ReserveMemoryWithPriority(f, "memory-reservation", increasedSize, int(float64(increasedSize)*float64(0.7)*float64(memAllocatableMb)), true, scaleUpTimeout, highPriorityClassName)
defer cleanupFunc()
By(fmt.Sprintf("Waiting for scale down hoping it won't happen, sleep for %s", scaleDownTimeout.String()))
time.Sleep(scaleDownTimeout)
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size == increasedSize }, time.Second))
})
})
func installNvidiaDriversDaemonSet() {
By("Add daemonset which installs nvidia drivers")
// the link differs from one in GKE documentation; discussed with @mindprince this one should be used
framework.RunKubectlOrDie("apply", "-f", "https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/daemonset.yaml")
}
func execCmd(args ...string) *exec.Cmd {
klog.Infof("Executing: %s", strings.Join(args, " "))
return exec.Command(args[0], args[1:]...)
}
func runDrainTest(f *framework.Framework, migSizes map[string]int, namespace string, podsPerNode, pdbSize int, verifyFunction func(int)) {
increasedSize := manuallyIncreaseClusterSize(f, migSizes)
nodes, err := f.ClientSet.CoreV1().Nodes().List(metav1.ListOptions{FieldSelector: fields.Set{
"spec.unschedulable": "false",
}.AsSelector().String()})
framework.ExpectNoError(err)
numPods := len(nodes.Items) * podsPerNode
testID := string(uuid.NewUUID()) // So that we can label and find pods
labelMap := map[string]string{"test_id": testID}
framework.ExpectNoError(runReplicatedPodOnEachNode(f, nodes.Items, namespace, podsPerNode, "reschedulable-pods", labelMap, 0))
defer framework.DeleteRCAndWaitForGC(f.ClientSet, namespace, "reschedulable-pods")
By("Create a PodDisruptionBudget")
minAvailable := intstr.FromInt(numPods - pdbSize)
pdb := &policy.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: "test_pdb",
Namespace: namespace,
},
Spec: policy.PodDisruptionBudgetSpec{
Selector: &metav1.LabelSelector{MatchLabels: labelMap},
MinAvailable: &minAvailable,
},
}
_, err = f.ClientSet.PolicyV1beta1().PodDisruptionBudgets(namespace).Create(pdb)
defer func() {
f.ClientSet.PolicyV1beta1().PodDisruptionBudgets(namespace).Delete(pdb.Name, &metav1.DeleteOptions{})
}()
framework.ExpectNoError(err)
verifyFunction(increasedSize)
}
func getGKEURL(apiVersion string, suffix string) string {
out, err := execCmd("gcloud", "auth", "print-access-token").Output()
framework.ExpectNoError(err)
token := strings.Replace(string(out), "\n", "", -1)
return fmt.Sprintf("%s/%s/%s?access_token=%s",
gkeEndpoint,
apiVersion,
suffix,
token)
}
func getGKEClusterURL(apiVersion string) string {
if isRegionalCluster() {
// TODO(bskiba): Use locations API for all clusters once it's graduated to v1.
return getGKEURL(apiVersion, fmt.Sprintf("projects/%s/locations/%s/clusters/%s",
framework.TestContext.CloudConfig.ProjectID,
framework.TestContext.CloudConfig.Region,
framework.TestContext.CloudConfig.Cluster))
} else {
return getGKEURL(apiVersion, fmt.Sprintf("projects/%s/zones/%s/clusters/%s",
framework.TestContext.CloudConfig.ProjectID,
framework.TestContext.CloudConfig.Zone,
framework.TestContext.CloudConfig.Cluster))
}
}
func getCluster(apiVersion string) (string, error) {
resp, err := http.Get(getGKEClusterURL(apiVersion))
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("error: %s %s", resp.Status, body)
}
return string(body), nil
}
func isAutoscalerEnabled(expectedMaxNodeCountInTargetPool int) (bool, error) {
apiVersion := "v1"
if isRegionalCluster() {
apiVersion = "v1beta1"
}
strBody, err := getCluster(apiVersion)
if err != nil {
return false, err
}
if strings.Contains(strBody, "\"maxNodeCount\": "+strconv.Itoa(expectedMaxNodeCountInTargetPool)) {
return true, nil
}
return false, nil
}
func getClusterLocation() string {
if isRegionalCluster() {
return "--region=" + framework.TestContext.CloudConfig.Region
} else {
return "--zone=" + framework.TestContext.CloudConfig.Zone
}
}
func getGcloudCommandFromTrack(commandTrack string, args []string) []string {
command := []string{"gcloud"}
if commandTrack == "beta" || commandTrack == "alpha" {
command = append(command, commandTrack)
}
command = append(command, args...)
command = append(command, getClusterLocation())
command = append(command, "--project="+framework.TestContext.CloudConfig.ProjectID)
return command
}
func getGcloudCommand(args []string) []string {
track := ""
if isRegionalCluster() {
track = "beta"
}
return getGcloudCommandFromTrack(track, args)
}
func isRegionalCluster() bool {
// TODO(bskiba): Use an appropriate indicator that the cluster is regional.
return framework.TestContext.CloudConfig.MultiZone
}
func enableAutoscaler(nodePool string, minCount, maxCount int) error {
klog.Infof("Using gcloud to enable autoscaling for pool %s", nodePool)
args := []string{"container", "clusters", "update", framework.TestContext.CloudConfig.Cluster,
"--enable-autoscaling",
"--min-nodes=" + strconv.Itoa(minCount),
"--max-nodes=" + strconv.Itoa(maxCount),
"--node-pool=" + nodePool}
output, err := execCmd(getGcloudCommand(args)...).CombinedOutput()
if err != nil {
klog.Errorf("Failed config update result: %s", output)
return fmt.Errorf("Failed to enable autoscaling: %v", err)
}
klog.Infof("Config update result: %s", output)
var finalErr error
for startTime := time.Now(); startTime.Add(gkeUpdateTimeout).After(time.Now()); time.Sleep(30 * time.Second) {
val, err := isAutoscalerEnabled(maxCount)
if err == nil && val {
return nil
}
finalErr = err
}
return fmt.Errorf("autoscaler not enabled, last error: %v", finalErr)
}
func disableAutoscaler(nodePool string, minCount, maxCount int) error {
klog.Infof("Using gcloud to disable autoscaling for pool %s", nodePool)
args := []string{"container", "clusters", "update", framework.TestContext.CloudConfig.Cluster,
"--no-enable-autoscaling",
"--node-pool=" + nodePool}
output, err := execCmd(getGcloudCommand(args)...).CombinedOutput()
if err != nil {
klog.Errorf("Failed config update result: %s", output)
return fmt.Errorf("Failed to disable autoscaling: %v", err)
}
klog.Infof("Config update result: %s", output)
var finalErr error
for startTime := time.Now(); startTime.Add(gkeUpdateTimeout).After(time.Now()); time.Sleep(30 * time.Second) {
val, err := isAutoscalerEnabled(maxCount)
if err == nil && !val {
return nil
}
finalErr = err
}
return fmt.Errorf("autoscaler still enabled, last error: %v", finalErr)
}
func executeHTTPRequest(method string, url string, body string) (string, error) {
client := &http.Client{}
req, err := http.NewRequest(method, url, strings.NewReader(body))
if err != nil {
By(fmt.Sprintf("Can't create request: %s", err.Error()))
return "", err
}
resp, err := client.Do(req)
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("error: %s %s", resp.Status, string(respBody))
}
return string(respBody), nil
}
func addNodePool(name string, machineType string, numNodes int) {
args := []string{"container", "node-pools", "create", name, "--quiet",
"--machine-type=" + machineType,
"--num-nodes=" + strconv.Itoa(numNodes),
"--cluster=" + framework.TestContext.CloudConfig.Cluster}
output, err := execCmd(getGcloudCommand(args)...).CombinedOutput()
klog.Infof("Creating node-pool %s: %s", name, output)
framework.ExpectNoError(err, string(output))
}
func addGpuNodePool(name string, gpuType string, gpuCount int, numNodes int) {
args := []string{"beta", "container", "node-pools", "create", name, "--quiet",
"--accelerator", "type=" + gpuType + ",count=" + strconv.Itoa(gpuCount),
"--num-nodes=" + strconv.Itoa(numNodes),
"--cluster=" + framework.TestContext.CloudConfig.Cluster}
output, err := execCmd(getGcloudCommand(args)...).CombinedOutput()
klog.Infof("Creating node-pool %s: %s", name, output)
framework.ExpectNoError(err, string(output))
}
func deleteNodePool(name string) {
klog.Infof("Deleting node pool %s", name)
args := []string{"container", "node-pools", "delete", name, "--quiet",
"--cluster=" + framework.TestContext.CloudConfig.Cluster}
err := wait.ExponentialBackoff(
wait.Backoff{Duration: 1 * time.Minute, Factor: float64(3), Steps: 3},
func() (bool, error) {
output, err := execCmd(getGcloudCommand(args)...).CombinedOutput()
if err != nil {
klog.Warningf("Error deleting nodegroup - error:%v, output: %s", err, output)
return false, nil
}
klog.Infof("Node-pool deletion output: %s", output)
return true, nil
})
framework.ExpectNoError(err)
}
func getPoolNodes(f *framework.Framework, poolName string) []*v1.Node {
nodes := make([]*v1.Node, 0, 1)
nodeList := framework.GetReadyNodesIncludingTaintedOrDie(f.ClientSet)
for _, node := range nodeList.Items {
if node.Labels[gkeNodepoolNameKey] == poolName {
nodes = append(nodes, &node)
}
}
return nodes
}
// getPoolInitialSize returns the initial size of the node pool taking into
// account that it may span multiple zones. In that case, node pool consists of
// multiple migs all containing initialNodeCount nodes.
func getPoolInitialSize(poolName string) int {
// get initial node count
args := []string{"container", "node-pools", "describe", poolName, "--quiet",
"--cluster=" + framework.TestContext.CloudConfig.Cluster,
"--format=value(initialNodeCount)"}
output, err := execCmd(getGcloudCommand(args)...).CombinedOutput()
klog.Infof("Node-pool initial size: %s", output)
framework.ExpectNoError(err, string(output))
fields := strings.Fields(string(output))
Expect(len(fields)).Should(Equal(1))
size, err := strconv.ParseInt(fields[0], 10, 64)
framework.ExpectNoError(err)
// get number of node pools
args = []string{"container", "node-pools", "describe", poolName, "--quiet",
"--cluster=" + framework.TestContext.CloudConfig.Cluster,
"--format=value(instanceGroupUrls)"}
output, err = execCmd(getGcloudCommand(args)...).CombinedOutput()
framework.ExpectNoError(err, string(output))
nodeGroupCount := len(strings.Split(string(output), ";"))
return int(size) * nodeGroupCount
}
func getPoolSize(f *framework.Framework, poolName string) int {
size := 0
nodeList := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
for _, node := range nodeList.Items {
if node.Labels[gkeNodepoolNameKey] == poolName {
size++
}
}
return size
}
func doPut(url, content string) (string, error) {
req, err := http.NewRequest("PUT", url, bytes.NewBuffer([]byte(content)))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
strBody := string(body)
return strBody, nil
}
func reserveMemory(f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration, selector map[string]string, tolerations []v1.Toleration, priorityClassName string) func() error {
By(fmt.Sprintf("Running RC which reserves %v MB of memory", megabytes))
request := int64(1024 * 1024 * megabytes / replicas)
config := &testutils.RCConfig{
Client: f.ClientSet,
InternalClient: f.InternalClientset,
Name: id,
Namespace: f.Namespace.Name,
Timeout: timeout,
Image: imageutils.GetPauseImageName(),
Replicas: replicas,
MemRequest: request,
NodeSelector: selector,
Tolerations: tolerations,
PriorityClassName: priorityClassName,
}
for start := time.Now(); time.Since(start) < rcCreationRetryTimeout; time.Sleep(rcCreationRetryDelay) {
err := framework.RunRC(*config)
if err != nil && strings.Contains(err.Error(), "Error creating replication controller") {
klog.Warningf("Failed to create memory reservation: %v", err)
continue
}
if expectRunning {
framework.ExpectNoError(err)
}
return func() error {
return framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, id)
}
}
framework.Failf("Failed to reserve memory within timeout")
return nil
}
// ReserveMemoryWithPriority creates a replication controller with pods with priority that, in summation,
// request the specified amount of memory.
func ReserveMemoryWithPriority(f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration, priorityClassName string) func() error {
return reserveMemory(f, id, replicas, megabytes, expectRunning, timeout, nil, nil, priorityClassName)
}
// ReserveMemoryWithSelector creates a replication controller with pods with node selector that, in summation,
// request the specified amount of memory.
func ReserveMemoryWithSelectorAndTolerations(f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration, selector map[string]string, tolerations []v1.Toleration) func() error {
return reserveMemory(f, id, replicas, megabytes, expectRunning, timeout, selector, tolerations, "")
}
// ReserveMemory creates a replication controller with pods that, in summation,
// request the specified amount of memory.
func ReserveMemory(f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration) func() error {
return reserveMemory(f, id, replicas, megabytes, expectRunning, timeout, nil, nil, "")
}
// WaitForClusterSizeFunc waits until the cluster size matches the given function.
func WaitForClusterSizeFunc(c clientset.Interface, sizeFunc func(int) bool, timeout time.Duration) error {
return WaitForClusterSizeFuncWithUnready(c, sizeFunc, timeout, 0)
}
// WaitForClusterSizeFuncWithUnready waits until the cluster size matches the given function and assumes some unready nodes.
func WaitForClusterSizeFuncWithUnready(c clientset.Interface, sizeFunc func(int) bool, timeout time.Duration, expectedUnready int) error {
for start := time.Now(); time.Since(start) < timeout; time.Sleep(20 * time.Second) {
nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{FieldSelector: fields.Set{
"spec.unschedulable": "false",
}.AsSelector().String()})
if err != nil {
klog.Warningf("Failed to list nodes: %v", err)
continue
}
numNodes := len(nodes.Items)
// Filter out not-ready nodes.
framework.FilterNodes(nodes, func(node v1.Node) bool {
return framework.IsNodeConditionSetAsExpected(&node, v1.NodeReady, true)
})
numReady := len(nodes.Items)
if numNodes == numReady+expectedUnready && sizeFunc(numNodes) {
klog.Infof("Cluster has reached the desired size")
return nil
}
klog.Infof("Waiting for cluster with func, current size %d, not ready nodes %d", numNodes, numNodes-numReady)
}
return fmt.Errorf("timeout waiting %v for appropriate cluster size", timeout)
}
func waitForCaPodsReadyInNamespace(f *framework.Framework, c clientset.Interface, tolerateUnreadyCount int) error {
var notready []string
for start := time.Now(); time.Now().Before(start.Add(scaleUpTimeout)); time.Sleep(20 * time.Second) {
pods, err := c.CoreV1().Pods(f.Namespace.Name).List(metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to get pods: %v", err)
}
notready = make([]string, 0)
for _, pod := range pods.Items {
ready := false
for _, c := range pod.Status.Conditions {
if c.Type == v1.PodReady && c.Status == v1.ConditionTrue {
ready = true
}
}
// Failed pods in this context generally mean that they have been
// double scheduled onto a node, but then failed a constraint check.
if pod.Status.Phase == v1.PodFailed {
klog.Warningf("Pod has failed: %v", pod)
}
if !ready && pod.Status.Phase != v1.PodFailed {
notready = append(notready, pod.Name)
}
}
if len(notready) <= tolerateUnreadyCount {
klog.Infof("sufficient number of pods ready. Tolerating %d unready", tolerateUnreadyCount)
return nil
}
klog.Infof("Too many pods are not ready yet: %v", notready)
}
klog.Info("Timeout on waiting for pods being ready")
klog.Info(framework.RunKubectlOrDie("get", "pods", "-o", "json", "--all-namespaces"))
klog.Info(framework.RunKubectlOrDie("get", "nodes", "-o", "json"))
// Some pods are still not running.
return fmt.Errorf("Too many pods are still not running: %v", notready)
}
func waitForAllCaPodsReadyInNamespace(f *framework.Framework, c clientset.Interface) error {
return waitForCaPodsReadyInNamespace(f, c, 0)
}
func getAnyNode(c clientset.Interface) *v1.Node {
nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{FieldSelector: fields.Set{
"spec.unschedulable": "false",
}.AsSelector().String()})
if err != nil {
klog.Errorf("Failed to get node list: %v", err)
return nil
}
if len(nodes.Items) == 0 {
klog.Errorf("No nodes")
return nil
}
return &nodes.Items[0]
}
func setMigSizes(sizes map[string]int) bool {
madeChanges := false
for mig, desiredSize := range sizes {
currentSize, err := framework.GroupSize(mig)
framework.ExpectNoError(err)
if desiredSize != currentSize {
By(fmt.Sprintf("Setting size of %s to %d", mig, desiredSize))
err = framework.ResizeGroup(mig, int32(desiredSize))
framework.ExpectNoError(err)
madeChanges = true
}
}
return madeChanges
}
func drainNode(f *framework.Framework, node *v1.Node) {
By("Make the single node unschedulable")
makeNodeUnschedulable(f.ClientSet, node)
By("Manually drain the single node")
podOpts := metav1.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, node.Name).String()}
pods, err := f.ClientSet.CoreV1().Pods(metav1.NamespaceAll).List(podOpts)
framework.ExpectNoError(err)
for _, pod := range pods.Items {
err = f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(pod.Name, metav1.NewDeleteOptions(0))
framework.ExpectNoError(err)
}
}
func makeNodeUnschedulable(c clientset.Interface, node *v1.Node) error {
By(fmt.Sprintf("Taint node %s", node.Name))
for j := 0; j < 3; j++ {
freshNode, err := c.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
if err != nil {
return err
}
for _, taint := range freshNode.Spec.Taints {
if taint.Key == disabledTaint {
return nil
}
}
freshNode.Spec.Taints = append(freshNode.Spec.Taints, v1.Taint{
Key: disabledTaint,
Value: "DisabledForTest",
Effect: v1.TaintEffectNoSchedule,
})
_, err = c.CoreV1().Nodes().Update(freshNode)
if err == nil {
return nil
}
if !errors.IsConflict(err) {
return err
}
klog.Warningf("Got 409 conflict when trying to taint node, retries left: %v", 3-j)
}
return fmt.Errorf("Failed to taint node in allowed number of retries")
}
// CriticalAddonsOnlyError implements the `error` interface, and signifies the
// presence of the `CriticalAddonsOnly` taint on the node.
type CriticalAddonsOnlyError struct{}
func (CriticalAddonsOnlyError) Error() string {
return fmt.Sprintf("CriticalAddonsOnly taint found on node")
}
func makeNodeSchedulable(c clientset.Interface, node *v1.Node, failOnCriticalAddonsOnly bool) error {
By(fmt.Sprintf("Remove taint from node %s", node.Name))
for j := 0; j < 3; j++ {
freshNode, err := c.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
if err != nil {
return err
}
var newTaints []v1.Taint
for _, taint := range freshNode.Spec.Taints {
if failOnCriticalAddonsOnly && taint.Key == criticalAddonsOnlyTaint {
return CriticalAddonsOnlyError{}
}
if taint.Key != disabledTaint {
newTaints = append(newTaints, taint)
}
}
if len(newTaints) == len(freshNode.Spec.Taints) {
return nil
}
freshNode.Spec.Taints = newTaints
_, err = c.CoreV1().Nodes().Update(freshNode)
if err == nil {
return nil
}
if !errors.IsConflict(err) {
return err
}
klog.Warningf("Got 409 conflict when trying to taint node, retries left: %v", 3-j)
}
return fmt.Errorf("Failed to remove taint from node in allowed number of retries")
}
// ScheduleAnySingleGpuPod schedules a pod which requires single GPU of any type
func ScheduleAnySingleGpuPod(f *framework.Framework, id string) error {
return ScheduleGpuPod(f, id, "", 1)
}
// ScheduleGpuPod schedules a pod which requires a given number of gpus of given type
func ScheduleGpuPod(f *framework.Framework, id string, gpuType string, gpuLimit int64) error {
config := &testutils.RCConfig{
Client: f.ClientSet,
InternalClient: f.InternalClientset,
Name: id,
Namespace: f.Namespace.Name,
Timeout: 3 * scaleUpTimeout, // spinning up GPU node is slow
Image: imageutils.GetPauseImageName(),
Replicas: 1,
GpuLimit: gpuLimit,
Labels: map[string]string{"requires-gpu": "yes"},
}
if gpuType != "" {
config.NodeSelector = map[string]string{gpuLabel: gpuType}
}
err := framework.RunRC(*config)
if err != nil {
return err
}
return nil
}
// Create an RC running a given number of pods with anti-affinity
func runAntiAffinityPods(f *framework.Framework, namespace string, pods int, id string, podLabels, antiAffinityLabels map[string]string) error {
config := &testutils.RCConfig{
Affinity: buildAntiAffinity(antiAffinityLabels),
Client: f.ClientSet,
InternalClient: f.InternalClientset,
Name: id,
Namespace: namespace,
Timeout: scaleUpTimeout,
Image: imageutils.GetPauseImageName(),
Replicas: pods,
Labels: podLabels,
}
err := framework.RunRC(*config)
if err != nil {
return err
}
_, err = f.ClientSet.CoreV1().ReplicationControllers(namespace).Get(id, metav1.GetOptions{})
if err != nil {
return err
}
return nil
}
func runVolumeAntiAffinityPods(f *framework.Framework, namespace string, pods int, id string, podLabels, antiAffinityLabels map[string]string, volumes []v1.Volume) error {
config := &testutils.RCConfig{
Affinity: buildAntiAffinity(antiAffinityLabels),
Volumes: volumes,
Client: f.ClientSet,
InternalClient: f.InternalClientset,
Name: id,
Namespace: namespace,
Timeout: scaleUpTimeout,
Image: imageutils.GetPauseImageName(),
Replicas: pods,
Labels: podLabels,
}
err := framework.RunRC(*config)
if err != nil {
return err
}
_, err = f.ClientSet.CoreV1().ReplicationControllers(namespace).Get(id, metav1.GetOptions{})
if err != nil {
return err
}
return nil
}
var emptyDirVolumes = []v1.Volume{
{
Name: "empty-volume",
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
},
}
func buildVolumes(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) []v1.Volume {
return []v1.Volume{
{
Name: pv.Name,
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
ReadOnly: false,
},
},
},
}
}
func buildAntiAffinity(labels map[string]string) *v1.Affinity {
return &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: labels,
},
TopologyKey: "kubernetes.io/hostname",
},
},
},
}
}
// Create an RC running a given number of pods on each node without adding any constraint forcing
// such pod distribution. This is meant to create a bunch of underutilized (but not unused) nodes
// with pods that can be rescheduled on different nodes.
// This is achieved using the following method:
// 1. disable scheduling on each node
// 2. create an empty RC
// 3. for each node:
// 3a. enable scheduling on that node
// 3b. increase number of replicas in RC by podsPerNode
func runReplicatedPodOnEachNode(f *framework.Framework, nodes []v1.Node, namespace string, podsPerNode int, id string, labels map[string]string, memRequest int64) error {
By("Run a pod on each node")
for _, node := range nodes {
err := makeNodeUnschedulable(f.ClientSet, &node)
defer func(n v1.Node) {
makeNodeSchedulable(f.ClientSet, &n, false)
}(node)
if err != nil {
return err
}
}
config := &testutils.RCConfig{
Client: f.ClientSet,
InternalClient: f.InternalClientset,
Name: id,
Namespace: namespace,
Timeout: defaultTimeout,
Image: imageutils.GetPauseImageName(),
Replicas: 0,
Labels: labels,
MemRequest: memRequest,
}
err := framework.RunRC(*config)
if err != nil {
return err
}
rc, err := f.ClientSet.CoreV1().ReplicationControllers(namespace).Get(id, metav1.GetOptions{})
if err != nil {
return err
}
for i, node := range nodes {
err = makeNodeSchedulable(f.ClientSet, &node, false)
if err != nil {
return err
}
// Update replicas count, to create new pods that will be allocated on node
// (we retry 409 errors in case rc reference got out of sync)
for j := 0; j < 3; j++ {
*rc.Spec.Replicas = int32((i + 1) * podsPerNode)
rc, err = f.ClientSet.CoreV1().ReplicationControllers(namespace).Update(rc)
if err == nil {
break
}
if !errors.IsConflict(err) {
return err
}
klog.Warningf("Got 409 conflict when trying to scale RC, retries left: %v", 3-j)
rc, err = f.ClientSet.CoreV1().ReplicationControllers(namespace).Get(id, metav1.GetOptions{})
if err != nil {
return err
}
}
err = wait.PollImmediate(5*time.Second, podTimeout, func() (bool, error) {
rc, err = f.ClientSet.CoreV1().ReplicationControllers(namespace).Get(id, metav1.GetOptions{})
if err != nil || rc.Status.ReadyReplicas < int32((i+1)*podsPerNode) {
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("failed to coerce RC into spawning a pod on node %s within timeout", node.Name)
}
err = makeNodeUnschedulable(f.ClientSet, &node)
if err != nil {
return err
}
}
return nil
}
// wrap runReplicatedPodOnEachNode to return cleanup
func runReplicatedPodOnEachNodeWithCleanup(f *framework.Framework, nodes []v1.Node, namespace string, podsPerNode int, id string, labels map[string]string, memRequest int64) (func(), error) {
err := runReplicatedPodOnEachNode(f, nodes, namespace, podsPerNode, id, labels, memRequest)
return func() {
framework.DeleteRCAndWaitForGC(f.ClientSet, namespace, id)
}, err
}
// Increase cluster size by newNodesForScaledownTests to create some unused nodes
// that can be later removed by cluster autoscaler.
func manuallyIncreaseClusterSize(f *framework.Framework, originalSizes map[string]int) int {
By("Manually increase cluster size")
increasedSize := 0
newSizes := make(map[string]int)
for key, val := range originalSizes {
newSizes[key] = val + newNodesForScaledownTests
increasedSize += val + newNodesForScaledownTests
}
setMigSizes(newSizes)
checkClusterSize := func(size int) bool {
if size >= increasedSize {
return true
}
resized := setMigSizes(newSizes)
if resized {
klog.Warning("Unexpected node group size while waiting for cluster resize. Setting size to target again.")
}
return false
}
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, checkClusterSize, manualResizeTimeout))
return increasedSize
}
// Try to get clusterwide health from CA status configmap.
// Status configmap is not parsing-friendly, so evil regexpery follows.
func getClusterwideStatus(c clientset.Interface) (string, error) {
configMap, err := c.CoreV1().ConfigMaps("kube-system").Get("cluster-autoscaler-status", metav1.GetOptions{})
if err != nil {
return "", err
}
status, ok := configMap.Data["status"]
if !ok {
return "", fmt.Errorf("Status information not found in configmap")
}
matcher, err := regexp.Compile("Cluster-wide:\\s*\n\\s*Health:\\s*([A-Za-z]+)")
if err != nil {
return "", err
}
result := matcher.FindStringSubmatch(status)
if len(result) < 2 {
return "", fmt.Errorf("Failed to parse CA status configmap, raw status: %v", status)
}
return result[1], nil
}
type scaleUpStatus struct {
status string
ready int
target int
timestamp time.Time
}
// Try to get timestamp from status.
// Status configmap is not parsing-friendly, so evil regexpery follows.
func getStatusTimestamp(status string) (time.Time, error) {
timestampMatcher, err := regexp.Compile("Cluster-autoscaler status at \\s*([0-9\\-]+ [0-9]+:[0-9]+:[0-9]+\\.[0-9]+ \\+[0-9]+ [A-Za-z]+)")
if err != nil {
return time.Time{}, err
}
timestampMatch := timestampMatcher.FindStringSubmatch(status)
if len(timestampMatch) < 2 {
return time.Time{}, fmt.Errorf("Failed to parse CA status timestamp, raw status: %v", status)
}
timestamp, err := time.Parse(timestampFormat, timestampMatch[1])
if err != nil {
return time.Time{}, err
}
return timestamp, nil
}
// Try to get scaleup statuses of all node groups.
// Status configmap is not parsing-friendly, so evil regexpery follows.
func getScaleUpStatus(c clientset.Interface) (*scaleUpStatus, error) {
configMap, err := c.CoreV1().ConfigMaps("kube-system").Get("cluster-autoscaler-status", metav1.GetOptions{})
if err != nil {
return nil, err
}
status, ok := configMap.Data["status"]
if !ok {
return nil, fmt.Errorf("Status information not found in configmap")
}
timestamp, err := getStatusTimestamp(status)
if err != nil {
return nil, err
}
matcher, err := regexp.Compile("s*ScaleUp:\\s*([A-Za-z]+)\\s*\\(ready=([0-9]+)\\s*cloudProviderTarget=([0-9]+)\\s*\\)")
if err != nil {
return nil, err
}
matches := matcher.FindAllStringSubmatch(status, -1)
if len(matches) < 1 {
return nil, fmt.Errorf("Failed to parse CA status configmap, raw status: %v", status)
}
result := scaleUpStatus{
status: caNoScaleUpStatus,
ready: 0,
target: 0,
timestamp: timestamp,
}
for _, match := range matches {
if match[1] == caOngoingScaleUpStatus {
result.status = caOngoingScaleUpStatus
}
newReady, err := strconv.Atoi(match[2])
if err != nil {
return nil, err
}
result.ready += newReady
newTarget, err := strconv.Atoi(match[3])
if err != nil {
return nil, err
}
result.target += newTarget
}
klog.Infof("Cluster-Autoscaler scale-up status: %v (%v, %v)", result.status, result.ready, result.target)
return &result, nil
}
func waitForScaleUpStatus(c clientset.Interface, cond func(s *scaleUpStatus) bool, timeout time.Duration) (*scaleUpStatus, error) {
var finalErr error
var status *scaleUpStatus
err := wait.PollImmediate(5*time.Second, timeout, func() (bool, error) {
status, finalErr = getScaleUpStatus(c)
if finalErr != nil {
return false, nil
}
if status.timestamp.Add(freshStatusLimit).Before(time.Now()) {
// stale status
finalErr = fmt.Errorf("Status too old")
return false, nil
}
return cond(status), nil
})
if err != nil {
err = fmt.Errorf("Failed to find expected scale up status: %v, last status: %v, final err: %v", err, status, finalErr)
}
return status, err
}
// This is a temporary fix to allow CA to migrate some kube-system pods
// TODO: Remove this when the PDB is added for some of those components
func addKubeSystemPdbs(f *framework.Framework) (func(), error) {
By("Create PodDisruptionBudgets for kube-system components, so they can be migrated if required")
var newPdbs []string
cleanup := func() {
var finalErr error
for _, newPdbName := range newPdbs {
By(fmt.Sprintf("Delete PodDisruptionBudget %v", newPdbName))
err := f.ClientSet.PolicyV1beta1().PodDisruptionBudgets("kube-system").Delete(newPdbName, &metav1.DeleteOptions{})
if err != nil {
// log error, but attempt to remove other pdbs
klog.Errorf("Failed to delete PodDisruptionBudget %v, err: %v", newPdbName, err)
finalErr = err
}
}
if finalErr != nil {
framework.Failf("Error during PodDisruptionBudget cleanup: %v", finalErr)
}
}
type pdbInfo struct {
label string
minAvailable int
}
pdbsToAdd := []pdbInfo{
{label: "kube-dns", minAvailable: 1},
{label: "kube-dns-autoscaler", minAvailable: 0},
{label: "metrics-server", minAvailable: 0},
{label: "kubernetes-dashboard", minAvailable: 0},
{label: "glbc", minAvailable: 0},
}
for _, pdbData := range pdbsToAdd {
By(fmt.Sprintf("Create PodDisruptionBudget for %v", pdbData.label))
labelMap := map[string]string{"k8s-app": pdbData.label}
pdbName := fmt.Sprintf("test-pdb-for-%v", pdbData.label)
minAvailable := intstr.FromInt(pdbData.minAvailable)
pdb := &policy.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: pdbName,
Namespace: "kube-system",
},
Spec: policy.PodDisruptionBudgetSpec{
Selector: &metav1.LabelSelector{MatchLabels: labelMap},
MinAvailable: &minAvailable,
},
}
_, err := f.ClientSet.PolicyV1beta1().PodDisruptionBudgets("kube-system").Create(pdb)
newPdbs = append(newPdbs, pdbName)
if err != nil {
return cleanup, err
}
}
return cleanup, nil
}
func createPriorityClasses(f *framework.Framework) func() {
priorityClasses := map[string]int32{
expendablePriorityClassName: -15,
highPriorityClassName: 1000,
}
for className, priority := range priorityClasses {
_, err := f.ClientSet.SchedulingV1beta1().PriorityClasses().Create(&schedulerapi.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: className}, Value: priority})
if err != nil {
klog.Errorf("Error creating priority class: %v", err)
}
Expect(err == nil || errors.IsAlreadyExists(err)).To(Equal(true))
}
return func() {
for className := range priorityClasses {
err := f.ClientSet.SchedulingV1beta1().PriorityClasses().Delete(className, nil)
if err != nil {
klog.Errorf("Error deleting priority class: %v", err)
}
}
}
}