blob: 2f6f3b6c3b30b1989f0f2b6ce1c1e535d52124d8 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"testing"
"gotest.tools/assert"
v1 "k8s.io/api/core/v1"
apis "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
const (
Host1 = "HOST1"
Host2 = "HOST2"
HostEmpty = ""
)
func TestUpdatePod(t *testing.T) {
mockedSchedulerApi := newMockSchedulerAPI()
nodes := newSchedulerNodes(mockedSchedulerApi, NewTestSchedulerCache())
host1 := utils.NodeForTest(Host1, "10G", "10")
host2 := utils.NodeForTest(Host2, "10G", "10")
nodes.addNode(host1)
nodes.addNode(host2)
coordinator := newNodeResourceCoordinator(nodes)
// pod is not assigned to any node
// this won't trigger an update
pod1 := utils.PodForTest("pod1", "1G", "500m")
pod2 := utils.PodForTest("pod1", "1G", "500m")
pod1.Status.Phase = v1.PodPending
pod1.Status.Phase = v1.PodPending
pod1.Spec.NodeName = ""
pod2.Spec.NodeName = ""
mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
t.Fatalf("update should not run because state is not changed")
return nil
}
coordinator.updatePod(pod1, pod2)
// pod is already assigned to a node and state is running
// this won't trigger an update
pod1.Status.Phase = v1.PodRunning
pod1.Status.Phase = v1.PodRunning
pod1.Spec.NodeName = Host1
pod2.Spec.NodeName = Host1
mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
t.Fatalf("update should not run because state is not changed")
return nil
}
coordinator.updatePod(pod1, pod2)
// pod state remains in Pending, pod is assigned to a node
// this happens when the pod just gets allocated and started, but not in running state yet
// trigger an update
pod1.Status.Phase = v1.PodPending
pod2.Status.Phase = v1.PodPending
pod1.Spec.NodeName = HostEmpty
pod2.Spec.NodeName = Host1
executed := false
mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
executed = true
assert.Equal(t, len(request.Nodes), 1)
updatedNode := request.Nodes[0]
assert.Equal(t, updatedNode.NodeID, Host1)
assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.Memory].Value, int64(10000*1000*1000))
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.CPU].Value, int64(10000))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.Memory].Value, int64(1000*1000*1000))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.CPU].Value, int64(500))
return nil
}
coordinator.updatePod(pod1, pod2)
assert.Assert(t, executed)
// pod state changed from running to failed, pod terminated
// trigger another update
pod1.Status.Phase = v1.PodRunning
pod2.Status.Phase = v1.PodFailed
pod1.Spec.NodeName = Host1
pod2.Spec.NodeName = Host1
executed = false
mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
executed = true
assert.Equal(t, len(request.Nodes), 1)
updatedNode := request.Nodes[0]
assert.Equal(t, updatedNode.NodeID, Host1)
assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.Memory].Value, int64(10000*1000*1000))
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.CPU].Value, int64(10000))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.Memory].Value, int64(0))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.CPU].Value, int64(0))
return nil
}
coordinator.updatePod(pod1, pod2)
assert.Assert(t, executed)
// pod state changed from pending to running
// this is not triggering a new update because the pod was already allocated to a node
pod1.Status.Phase = v1.PodPending
pod2.Status.Phase = v1.PodRunning
pod1.Spec.NodeName = Host2
pod2.Spec.NodeName = Host2
mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
t.Fatalf("update should not run because pod is already allocated")
return nil
}
coordinator.updatePod(pod1, pod2)
// pod state Running, pod is assigned to a node
// trigger an update
pod1.Status.Phase = v1.PodRunning
pod2.Status.Phase = v1.PodRunning
pod1.Spec.NodeName = ""
pod2.Spec.NodeName = Host2
executed = false
mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
executed = true
assert.Equal(t, len(request.Nodes), 1)
updatedNode := request.Nodes[0]
assert.Equal(t, updatedNode.NodeID, Host2)
assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.Memory].Value, int64(10000*1000*1000))
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.CPU].Value, int64(10000))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.Memory].Value, int64(1000*1000*1000))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.CPU].Value, int64(500))
return nil
}
coordinator.updatePod(pod1, pod2)
assert.Assert(t, executed)
// pod state Running to Succeed, pod terminated
// this should trigger an update
pod1.Status.Phase = v1.PodRunning
pod2.Status.Phase = v1.PodSucceeded
pod1.Spec.NodeName = Host2
pod2.Spec.NodeName = Host2
executed = false
mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
executed = true
assert.Equal(t, len(request.Nodes), 1)
updatedNode := request.Nodes[0]
assert.Equal(t, updatedNode.NodeID, Host2)
assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.Memory].Value, int64(10000*1000*1000))
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.CPU].Value, int64(10000))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.Memory].Value, int64(0))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.CPU].Value, int64(0))
return nil
}
coordinator.updatePod(pod1, pod2)
assert.Assert(t, executed)
// pod gets assigned to a node, but pod status is already failed
// this should not trigger an update
pod1.Status.Phase = v1.PodFailed
pod2.Status.Phase = v1.PodFailed
pod1.Spec.NodeName = HostEmpty
pod2.Spec.NodeName = Host2
mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
t.Fatalf("update should not run because pod is already allocated")
return nil
}
coordinator.updatePod(pod1, pod2)
}
func TestDeletePod(t *testing.T) {
mockedSchedulerApi := newMockSchedulerAPI()
nodes := newSchedulerNodes(mockedSchedulerApi, NewTestSchedulerCache())
host1 := utils.NodeForTest(Host1, "10G", "10")
nodes.addNode(host1)
coordinator := newNodeResourceCoordinator(nodes)
// pod from pending to running
// occupied resources should be added to the node
pod1 := utils.PodForTest("pod1", "1G", "500m")
pod2 := utils.PodForTest("pod1", "1G", "500m")
pod1.Status.Phase = v1.PodPending
pod2.Status.Phase = v1.PodRunning
pod1.Spec.NodeName = HostEmpty
pod2.Spec.NodeName = Host1
executed := false
mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
executed = true
assert.Equal(t, len(request.Nodes), 1)
updatedNode := request.Nodes[0]
assert.Equal(t, updatedNode.NodeID, Host1)
assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.Memory].Value, int64(10000*1000*1000))
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.CPU].Value, int64(10000))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.Memory].Value, int64(1000*1000*1000))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.CPU].Value, int64(500))
return nil
}
coordinator.updatePod(pod1, pod2)
assert.Assert(t, executed)
// delete pod from the running state
executed = false
mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
executed = true
assert.Equal(t, len(request.Nodes), 1)
updatedNode := request.Nodes[0]
assert.Equal(t, updatedNode.NodeID, Host1)
assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.Memory].Value, int64(10000))
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.CPU].Value, int64(10000))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.Memory].Value, int64(0))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.CPU].Value, int64(0))
return nil
}
coordinator.deletePod(pod1)
}
func TestDeleteTerminatedPod(t *testing.T) {
mockedSchedulerApi := newMockSchedulerAPI()
nodes := newSchedulerNodes(mockedSchedulerApi, NewTestSchedulerCache())
host1 := utils.NodeForTest(Host1, "10G", "10")
nodes.addNode(host1)
coordinator := newNodeResourceCoordinator(nodes)
// pod from pending to running
// occupied resources should be added to the node
pod1 := utils.PodForTest("pod1", "1G", "500m")
pod2 := utils.PodForTest("pod1", "1G", "500m")
pod1.Status.Phase = v1.PodPending
pod2.Status.Phase = v1.PodRunning
pod1.Spec.NodeName = HostEmpty
pod2.Spec.NodeName = Host1
executed := false
mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
executed = true
assert.Equal(t, len(request.Nodes), 1)
updatedNode := request.Nodes[0]
assert.Equal(t, updatedNode.NodeID, Host1)
assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.Memory].Value, int64(10000*1000*1000))
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.CPU].Value, int64(10000))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.Memory].Value, int64(1000*1000*1000))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.CPU].Value, int64(500))
return nil
}
coordinator.updatePod(pod1, pod2)
assert.Assert(t, executed)
// pod from running to succeed
// occupied resources should be added to the node
pod1.Status.Phase = v1.PodRunning
pod2.Status.Phase = v1.PodSucceeded
pod1.Spec.NodeName = Host1
pod2.Spec.NodeName = Host1
executed = false
mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
executed = true
assert.Equal(t, len(request.Nodes), 1)
updatedNode := request.Nodes[0]
assert.Equal(t, updatedNode.NodeID, Host1)
assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.Memory].Value, int64(10000*1000*1000))
assert.Equal(t, updatedNode.SchedulableResource.Resources[constants.CPU].Value, int64(10000))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.Memory].Value, int64(0))
assert.Equal(t, updatedNode.OccupiedResource.Resources[constants.CPU].Value, int64(0))
return nil
}
coordinator.updatePod(pod1, pod2)
assert.Assert(t, executed)
// delete pod from the succeed state
executed = false
mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
executed = true
t.Fatalf("update should not be triggered as it should have already done")
return nil
}
coordinator.deletePod(pod2)
assert.Equal(t, executed, false)
}
func TestNodeCoordinatorFilterPods(t *testing.T) {
mockedSchedulerAPI := newMockSchedulerAPI()
nodes := newSchedulerNodes(mockedSchedulerAPI, NewTestSchedulerCache())
host1 := utils.NodeForTest(Host1, "10G", "10")
nodes.addNode(host1)
coordinator := newNodeResourceCoordinator(nodes)
pod1 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "yunikorn-test-00001",
UID: "UID-00001",
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
}
pod2 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "yunikorn-test-00002",
UID: "UID-00002",
},
Spec: v1.PodSpec{SchedulerName: "default-scheduler"},
}
pod3 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "yunikorn-test-00003",
UID: "UID-00003",
Labels: map[string]string{"applicationId": "test-00003"},
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
}
assert.Check(t, !coordinator.filterPods(nil), "nil object was allowed")
assert.Check(t, coordinator.filterPods(pod1), "yunikorn-managed pod with no app id was filtered")
assert.Check(t, coordinator.filterPods(pod2), "non-yunikorn-managed pod was filtered")
assert.Check(t, !coordinator.filterPods(pod3), "yunikorn-managed pod was allowed")
}