blob: 7140b965b01566ec1de2f13adbbb48beb0003f52 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package preemption_test
import (
"fmt"
"strings"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
tests "github.com/apache/yunikorn-k8shim/test/e2e"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn"
)
var kClient k8s.KubeCtl
var restClient yunikorn.RClient
var ns *v1.Namespace
var dev string
var oldConfigMap = new(v1.ConfigMap)
var annotation = "ann-" + common.RandSeq(10)
// Nodes
var Worker = ""
var WorkerMemRes int64
var sleepPodMemLimit int64
var sleepPodMemLimit2 int64
var taintKey = "e2e_test_preemption"
var nodesToTaint []string
var _ = ginkgo.BeforeSuite(func() {
// Initializing kubectl client
kClient = k8s.KubeCtl{}
Ω(kClient.SetClient()).To(gomega.BeNil())
// Initializing rest client
restClient = yunikorn.RClient{}
Ω(restClient).NotTo(gomega.BeNil())
yunikorn.EnsureYuniKornConfigsPresent()
ginkgo.By("Port-forward the scheduler pod")
var err = kClient.PortForwardYkSchedulerPod()
Ω(err).NotTo(gomega.HaveOccurred())
var nodes *v1.NodeList
nodes, err = kClient.GetNodes()
Ω(err).NotTo(gomega.HaveOccurred())
Ω(len(nodes.Items)).NotTo(gomega.BeZero(), "Nodes cant be empty")
// Extract node allocatable resources
for _, node := range nodes.Items {
// skip master if it's marked as such
node := node
if k8s.IsMasterNode(&node) || !k8s.IsComputeNode(&node) {
continue
}
if Worker == "" {
Worker = node.Name
} else {
nodesToTaint = append(nodesToTaint, node.Name)
}
}
Ω(Worker).NotTo(gomega.BeEmpty(), "Worker node not found")
ginkgo.By("Tainting some nodes..")
err = kClient.TaintNodes(nodesToTaint, taintKey, "value", v1.TaintEffectNoSchedule)
Ω(err).NotTo(gomega.HaveOccurred())
nodesDAOInfo, err := restClient.GetNodes(constants.DefaultPartition)
Ω(err).NotTo(gomega.HaveOccurred())
Ω(nodesDAOInfo).NotTo(gomega.BeNil())
for _, node := range *nodesDAOInfo {
if node.NodeID == Worker {
WorkerMemRes = node.Available["memory"]
}
}
WorkerMemRes /= (1000 * 1000) // change to M
fmt.Fprintf(ginkgo.GinkgoWriter, "Worker node %s available memory %dM\n", Worker, WorkerMemRes)
sleepPodMemLimit = int64(float64(WorkerMemRes) / 3)
Ω(sleepPodMemLimit).NotTo(gomega.BeZero(), "Sleep pod memory limit cannot be zero")
fmt.Fprintf(ginkgo.GinkgoWriter, "Sleep pod limit memory %dM\n", sleepPodMemLimit)
sleepPodMemLimit2 = int64(float64(WorkerMemRes) / 4)
Ω(sleepPodMemLimit2).NotTo(gomega.BeZero(), "Sleep pod memory limit cannot be zero")
fmt.Fprintf(ginkgo.GinkgoWriter, "Sleep pod limit memory %dM\n", sleepPodMemLimit2)
})
var _ = ginkgo.AfterSuite(func() {
ginkgo.By("Untainting some nodes")
err := kClient.UntaintNodes(nodesToTaint, taintKey)
Ω(err).NotTo(gomega.HaveOccurred(), "Could not remove taint from nodes "+strings.Join(nodesToTaint, ","))
ginkgo.By("Check Yunikorn's health")
checks, err := yunikorn.GetFailedHealthChecks()
Ω(err).NotTo(gomega.HaveOccurred())
Ω(checks).To(gomega.Equal(""), checks)
})
var _ = ginkgo.Describe("Preemption", func() {
ginkgo.BeforeEach(func() {
dev = "dev-" + common.RandSeq(5)
ginkgo.By("Creating development namespace: " + dev)
var err error
ns, err = kClient.CreateNamespace(dev, nil)
Ω(err).NotTo(HaveOccurred())
Ω(ns.Status.Phase).To(gomega.Equal(v1.NamespaceActive))
})
ginkgo.It("Verify_basic_preemption", func() {
ginkgo.By("A queue uses resource more than the guaranteed value even after removing one of the pods. The cluster doesn't have enough resource to deploy a pod in another queue which uses resource less than the guaranteed value.")
// update config
ginkgo.By(fmt.Sprintf("Update root.sandbox1 and root.sandbox2 with guaranteed memory %dM", sleepPodMemLimit))
annotation = "ann-" + common.RandSeq(10)
yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", annotation, func(sc *configs.SchedulerConfig) error {
// remove placement rules so we can control queue
sc.Partitions[0].PlacementRules = nil
var err error
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "sandbox1",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
Properties: map[string]string{"preemption.delay": "1s"},
}); err != nil {
return err
}
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "sandbox2",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
Properties: map[string]string{"preemption.delay": "1s"},
}); err != nil {
return err
}
return nil
})
// Define sleepPod
sleepPodConfigs := createSandbox1SleepPodCofigs(3, 600)
sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: dev, Mem: sleepPodMemLimit, Time: 600, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.sandbox2"}}
sleepPodConfigs = append(sleepPodConfigs, sleepPod4Config)
for _, config := range sleepPodConfigs {
ginkgo.By("Deploy the sleep pod " + config.Name + " to the development namespace")
sleepObj, podErr := k8s.InitSleepPod(config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
// Wait for pod to move to running state
podErr = kClient.WaitForPodBySelectorRunning(dev,
fmt.Sprintf("app=%s", sleepRespPod.ObjectMeta.Labels["app"]),
60)
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
}
// assert one of the pods in root.sandbox1 is preempted
ginkgo.By("One of the pods in root.sanbox1 is preempted")
sandbox1RunningPodsCnt := 0
pods, err := kClient.ListPodsByLabelSelector(dev, "queue=root.sandbox1")
gomega.Ω(err).NotTo(gomega.HaveOccurred())
for _, pod := range pods.Items {
if pod.DeletionTimestamp != nil {
continue
}
if pod.Status.Phase == v1.PodRunning {
sandbox1RunningPodsCnt++
}
}
Ω(sandbox1RunningPodsCnt).To(gomega.Equal(2), "One of the pods in root.sandbox1 should be preempted")
})
ginkgo.It("Verify_no_preemption_on_resources_less_than_guaranteed_value", func() {
ginkgo.By("A queue uses resource less than the guaranteed value can't be preempted.")
// update config
ginkgo.By(fmt.Sprintf("Update root.sandbox1 and root.sandbox2 with guaranteed memory %dM", WorkerMemRes))
annotation = "ann-" + common.RandSeq(10)
yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", annotation, func(sc *configs.SchedulerConfig) error {
// remove placement rules so we can control queue
sc.Partitions[0].PlacementRules = nil
var err error
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "sandbox1",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", WorkerMemRes)}},
Properties: map[string]string{"preemption.delay": "1s"},
}); err != nil {
return err
}
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "sandbox2",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", WorkerMemRes)}},
Properties: map[string]string{"preemption.delay": "1s"},
}); err != nil {
return err
}
return nil
})
// Define sleepPod
sandbox1SleepPodConfigs := createSandbox1SleepPodCofigs(3, 30)
sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: dev, Mem: sleepPodMemLimit, Time: 30, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.sandbox2"}}
// Deploy pods in root.sandbox1
for _, config := range sandbox1SleepPodConfigs {
ginkgo.By("Deploy the sleep pod " + config.Name + " to the development namespace")
sleepObj, podErr := k8s.InitSleepPod(config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
// Wait for pod to move to running state
podErr = kClient.WaitForPodBySelectorRunning(dev,
fmt.Sprintf("app=%s", sleepRespPod.ObjectMeta.Labels["app"]),
30)
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
}
// Deploy sleepjob4 pod in root.sandbox2
ginkgo.By("Deploy the sleep pod " + sleepPod4Config.Name + " to the development namespace")
sleepObj, podErr := k8s.InitSleepPod(sleepPod4Config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod4, err := kClient.CreatePod(sleepObj, dev)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
// sleepjob4 pod can't be scheduled before pods in root.sandbox1 are succeeded
ginkgo.By("The sleep pod " + sleepPod4Config.Name + " can't be scheduled")
err = kClient.WaitForPodUnschedulable(sleepRespPod4, 60*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
// pods in root.sandbox1 can be succeeded
ginkgo.By("The pods in root.sandbox1 can be succeeded")
for _, config := range sandbox1SleepPodConfigs {
err = kClient.WaitForPodSucceeded(dev, config.Name, 30*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
}
})
ginkgo.It("Verify_no_preemption_outside_fence", func() {
ginkgo.By("The preemption can't go outside the fence.")
// update config
ginkgo.By(fmt.Sprintf("Update root.sandbox1 and root.sandbox2 with guaranteed memory %dM. The root.sandbox2 has fence preemption policy.", sleepPodMemLimit))
annotation = "ann-" + common.RandSeq(10)
yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", annotation, func(sc *configs.SchedulerConfig) error {
// remove placement rules so we can control queue
sc.Partitions[0].PlacementRules = nil
var err error
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "sandbox1",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
Properties: map[string]string{"preemption.delay": "1s"},
}); err != nil {
return err
}
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "sandbox2",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
Properties: map[string]string{"preemption.delay": "1s", "preemption.policy": "fence"},
}); err != nil {
return err
}
return nil
})
// Define sleepPod
sandbox1SleepPodConfigs := createSandbox1SleepPodCofigs(3, 30)
sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: dev, Mem: sleepPodMemLimit, Time: 30, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.sandbox2"}}
// Deploy pods in root.sandbox1
for _, config := range sandbox1SleepPodConfigs {
ginkgo.By("Deploy the sleep pod " + config.Name + " to the development namespace")
sleepObj, podErr := k8s.InitSleepPod(config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
// Wait for pod to move to running state
podErr = kClient.WaitForPodBySelectorRunning(dev,
fmt.Sprintf("app=%s", sleepRespPod.ObjectMeta.Labels["app"]),
30)
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
}
// Deploy sleepjob4 pod in root.sandbox2
ginkgo.By("Deploy the sleep pod " + sleepPod4Config.Name + " to the development namespace")
sleepObj, podErr := k8s.InitSleepPod(sleepPod4Config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod4, err := kClient.CreatePod(sleepObj, dev)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
// sleepjob4 pod can't be scheduled before pods in root.sandbox1 are succeeded
ginkgo.By("The sleep pod " + sleepPod4Config.Name + " can't be scheduled")
err = kClient.WaitForPodUnschedulable(sleepRespPod4, 60*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
// pods in root.sandbox1 can be succeeded
ginkgo.By("The pods in root.sandbox1 can be succeeded")
for _, config := range sandbox1SleepPodConfigs {
err = kClient.WaitForPodSucceeded(dev, config.Name, 30*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
}
})
ginkgo.It("Verify_preemption_on_priority_queue", func() {
ginkgo.By("A task can only preempt a task with lower or equal priority")
// update config
ginkgo.By(fmt.Sprintf("Update root.sandbox1, root.low-priority, root.high-priority with guaranteed memory %dM", sleepPodMemLimit))
annotation = "ann-" + common.RandSeq(10)
yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", annotation, func(sc *configs.SchedulerConfig) error {
// remove placement rules so we can control queue
sc.Partitions[0].PlacementRules = nil
var err error
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "high-priority",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
Properties: map[string]string{"preemption.delay": "1s", "priority.offset": "100"},
}); err != nil {
return err
}
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "sandbox1",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
Properties: map[string]string{"preemption.delay": "1s", "priority.offset": "0"},
}); err != nil {
return err
}
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "low-priority",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
Properties: map[string]string{"preemption.delay": "1s", "priority.offset": "-100"},
}); err != nil {
return err
}
return nil
})
// Define sleepPod
sandbox1SleepPodConfigs := createSandbox1SleepPodCofigs(3, 30)
sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: dev, Mem: sleepPodMemLimit, Time: 600, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.low-priority"}}
sleepPod5Config := k8s.SleepPodConfig{Name: "sleepjob5", NS: dev, Mem: sleepPodMemLimit, Time: 600, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.high-priority"}}
for _, config := range sandbox1SleepPodConfigs {
ginkgo.By("Deploy the sleep pod " + config.Name + " to the development namespace")
sleepObj, podErr := k8s.InitSleepPod(config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
// Wait for pod to move to running state
podErr = kClient.WaitForPodBySelectorRunning(dev,
fmt.Sprintf("app=%s", sleepRespPod.ObjectMeta.Labels["app"]),
60)
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
}
// Deploy sleepjob4 pod in root.low-priority
ginkgo.By("Deploy the sleep pod " + sleepPod4Config.Name + " to the development namespace")
sleepObj, podErr := k8s.InitSleepPod(sleepPod4Config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod4, err := kClient.CreatePod(sleepObj, dev)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
// Deploy sleepjob5 pod in root.high-priority
ginkgo.By("Deploy the sleep pod " + sleepPod5Config.Name + " to the development namespace")
sleepObj, podErr = k8s.InitSleepPod(sleepPod5Config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod5, err := kClient.CreatePod(sleepObj, dev)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
// sleepjob4 pod can't be scheduled before pods in root.sandbox1 are succeeded
ginkgo.By("The sleep pod " + sleepPod4Config.Name + " can't be scheduled")
err = kClient.WaitForPodUnschedulable(sleepRespPod4, 30*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
// sleepjob5 pod can be scheduled before pods in root.sandbox1 are succeeded
ginkgo.By("The sleep pod " + sleepPod5Config.Name + " can be scheduled")
err = kClient.WaitForPodScheduled(ns.Name, sleepRespPod5.Name, 30*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
// assert one of the pods in root.sandbox1 is preempted
ginkgo.By("One of the pods in root.sanbox1 is preempted")
sandbox1RunningPodsCnt := 0
pods, err := kClient.ListPodsByLabelSelector(dev, "queue=root.sandbox1")
gomega.Ω(err).NotTo(gomega.HaveOccurred())
for _, pod := range pods.Items {
if pod.DeletionTimestamp != nil {
continue
}
if pod.Status.Phase == v1.PodRunning {
sandbox1RunningPodsCnt++
}
}
Ω(sandbox1RunningPodsCnt).To(gomega.Equal(2), "One of the pods in root.sandbox1 should be preempted")
})
ginkgo.It("Verify_allow_preemption_tag", func() {
ginkgo.By("The value of 'false' for the allow preemption annotation on the PriorityClass moves the Pod to the back of the preemption list")
// update config
ginkgo.By(fmt.Sprintf("Update root.sandbox3, root.sandbox4 and root.sandbox5 with guaranteed memory %dM", sleepPodMemLimit2))
annotation = "ann-" + common.RandSeq(10)
yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", annotation, func(sc *configs.SchedulerConfig) error {
// remove placement rules so we can control queue
sc.Partitions[0].PlacementRules = nil
var err error
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "sandbox3",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit2)}},
Properties: map[string]string{"preemption.delay": "1s"},
}); err != nil {
return err
}
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "sandbox4",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit2)}},
Properties: map[string]string{"preemption.delay": "1s"},
}); err != nil {
return err
}
if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{
Name: "sandbox5",
Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit2)}},
Properties: map[string]string{"preemption.delay": "1s"},
}); err != nil {
return err
}
return nil
})
// Define PriorityClass
var preemptAllowPriorityClass = schedulingv1.PriorityClass{
ObjectMeta: metav1.ObjectMeta{
Name: "allow-preemption",
Annotations: map[string]string{constants.AnnotationAllowPreemption: constants.True},
},
}
var preemptNotAllowPriorityClass = schedulingv1.PriorityClass{
ObjectMeta: metav1.ObjectMeta{
Name: "preemption-not-allow",
Annotations: map[string]string{constants.AnnotationAllowPreemption: constants.False},
},
}
// Create PriorityClass
ginkgo.By(fmt.Sprintf("Creating priority class %s", preemptAllowPriorityClass.Name))
_, err := kClient.CreatePriorityClass(&preemptAllowPriorityClass)
gomega.Ω(err).ShouldNot(HaveOccurred())
ginkgo.By(fmt.Sprintf("Creating priority class %s", preemptNotAllowPriorityClass.Name))
_, err = kClient.CreatePriorityClass(&preemptNotAllowPriorityClass)
gomega.Ω(err).ShouldNot(HaveOccurred())
// Define sleepPod
sleepPod1Config := k8s.SleepPodConfig{Name: "sleepjob1", NS: dev, Mem: sleepPodMemLimit2, Time: 60, Optedout: k8s.NotConfig, Labels: map[string]string{"queue": "root.sandbox3"}}
sleepPod2Config := k8s.SleepPodConfig{Name: "sleepjob2", NS: dev, Mem: sleepPodMemLimit2, Time: 60, Optedout: k8s.NotConfig, Labels: map[string]string{"queue": "root.sandbox3"}}
sleepPod3Config := k8s.SleepPodConfig{Name: "sleepjob3", NS: dev, Mem: sleepPodMemLimit2, Time: 60, Optedout: k8s.NotConfig, Labels: map[string]string{"queue": "root.sandbox4"}}
sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: dev, Mem: sleepPodMemLimit2, Time: 60, Optedout: k8s.NotConfig, Labels: map[string]string{"queue": "root.sandbox4"}}
sleepPod5Config := k8s.SleepPodConfig{Name: "sleepjob5", NS: dev, Mem: sleepPodMemLimit2, Time: 600, Optedout: k8s.NotConfig, Labels: map[string]string{"queue": "root.sandbox5"}}
for _, config := range []k8s.SleepPodConfig{sleepPod1Config, sleepPod2Config, sleepPod3Config, sleepPod4Config} {
ginkgo.By("Deploy the sleep pod " + config.Name + " to the development namespace")
sleepObj, podErr := k8s.InitSleepPod(config)
// Setting PriorityClasses for Pods in a specific queue
if config.Name == "sleepjob3" || config.Name == "sleepjob4" {
sleepObj.Spec.PriorityClassName = "preemption-not-allow"
} else {
sleepObj.Spec.PriorityClassName = "allow-preemption"
}
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
// Wait for pod to move to running state
podErr = kClient.WaitForPodBySelectorRunning(dev,
fmt.Sprintf("app=%s", sleepRespPod.ObjectMeta.Labels["app"]),
60*60*2)
gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
}
// Deploy sleepjob5 pod in root.sandbox5
ginkgo.By("Deploy the sleep pod " + sleepPod5Config.Name + " to the development namespace")
sleepObj, podErr := k8s.InitSleepPod(sleepPod5Config)
Ω(podErr).NotTo(gomega.HaveOccurred())
sleepRespPod5, err := kClient.CreatePod(sleepObj, dev)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
// sleepjob5 pod can be scheduled before pods in root.sandbox3 are succeeded
ginkgo.By("The sleep pod " + sleepPod5Config.Name + " can be scheduled")
err = kClient.WaitForPodScheduled(ns.Name, sleepRespPod5.Name, 30*time.Second)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
// assert one of the pods in root.sandbox3 is preempted
ginkgo.By("One of the pods in root.sanbox4 is preempted")
sandbox3RunningPodsCnt := 0
pods, err := kClient.ListPodsByLabelSelector(dev, "queue=root.sandbox3")
gomega.Ω(err).NotTo(gomega.HaveOccurred())
for _, pod := range pods.Items {
if pod.DeletionTimestamp != nil {
continue
}
if pod.Status.Phase == v1.PodRunning {
sandbox3RunningPodsCnt++
}
}
Ω(sandbox3RunningPodsCnt).To(gomega.Equal(1), "One of the pods in root.sandbox3 should be preempted")
ginkgo.By(fmt.Sprintf("Removing priority class %s", preemptAllowPriorityClass.ObjectMeta.Name))
err = kClient.DeletePriorityClass(preemptAllowPriorityClass.ObjectMeta.Name)
gomega.Ω(err).ShouldNot(HaveOccurred())
ginkgo.By(fmt.Sprintf("Removing priority class %s", preemptNotAllowPriorityClass.ObjectMeta.Name))
err = kClient.DeletePriorityClass(preemptNotAllowPriorityClass.ObjectMeta.Name)
gomega.Ω(err).ShouldNot(HaveOccurred())
})
ginkgo.AfterEach(func() {
testDescription := ginkgo.CurrentSpecReport()
if testDescription.Failed() {
tests.LogTestClusterInfoWrapper(testDescription.FailureMessage(), []string{ns.Name})
tests.LogYunikornContainer(testDescription.FailureMessage())
}
ginkgo.By("Tear down namespace: " + dev)
err := kClient.TearDownNamespace(dev)
Ω(err).NotTo(HaveOccurred())
// reset config
ginkgo.By("Restoring YuniKorn configuration")
yunikorn.RestoreConfigMapWrapper(oldConfigMap, annotation)
})
})
func createSandbox1SleepPodCofigs(cnt, time int) []k8s.SleepPodConfig {
sandbox1Configs := make([]k8s.SleepPodConfig, 0, cnt)
for i := 0; i < cnt; i++ {
sandbox1Configs = append(sandbox1Configs, k8s.SleepPodConfig{Name: fmt.Sprintf("sleepjob%d", i+1), NS: dev, Mem: sleepPodMemLimit, Time: time, Optedout: k8s.Allow, Labels: map[string]string{"queue": "root.sandbox1"}})
}
return sandbox1Configs
}