blob: 2a68c9c6d909ac41d392fd27c10ea804622eba9c [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"testing"
"time"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
apis "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/apache/yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1"
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
)
const (
appID = "app01"
queue = "root.default"
namespace = "test"
)
func TestNewPlaceholderManager(t *testing.T) {
// the shim does not startup if it cannot get a kubeclient, no need to check for nil
mockedAPIProvider := client.NewMockedAPIProvider(false)
mgr := NewPlaceholderManager(mockedAPIProvider.GetAPIs())
assert.Equal(t, mgr.running.Load(), false, "new manager should not run")
if mgr.orphanPods == nil && len(mgr.orphanPods) != 0 {
t.Fatal("orphanPods map should be initialised and empty")
}
if mgr.stopChan == nil {
t.Fatal("stop channel should be initialised")
}
}
func TestCreateAppPlaceholders(t *testing.T) {
app := createAppWIthTaskGroupForTest()
mockedAPIProvider := client.NewMockedAPIProvider(false)
createdPods := createAndCheckPlaceholderCreate(mockedAPIProvider, app, t)
for _, pod := range createdPods {
assert.Assert(t, len(pod.OwnerReferences) == 0, "By default the pod should not have owner reference set")
}
// simulate placeholder creation failures
// failed to create one placeholder
mockedAPIProvider.MockCreateFn(func(pod *v1.Pod) (*v1.Pod, error) {
if pod.Name == "tg-test-group-2-app01-15" {
return nil, fmt.Errorf("failed to create pod %s", pod.Name)
}
return pod, nil
})
err := placeholderMgr.createAppPlaceholders(app)
assert.Error(t, err, "failed to create pod tg-test-group-2-app01-15")
}
func TestCreateAppPlaceholdersWithExistingPods(t *testing.T) {
createdPods := make(map[string]*v1.Pod)
mockedAPIProvider := client.NewMockedAPIProvider(false)
mockedAPIProvider.MockCreateFn(func(pod *v1.Pod) (*v1.Pod, error) {
createdPods[pod.Name] = pod
return pod, nil
})
placeholderMgr = NewPlaceholderManager(mockedAPIProvider.GetAPIs())
app := createAppWIthTaskGroupAndPodsForTest()
err := placeholderMgr.createAppPlaceholders(app)
assert.NilError(t, err)
assert.Equal(t, 27, len(createdPods))
assert.Equal(t, (*v1.Pod)(nil), createdPods["tg-test-group-1-app01-0"], "Pod should not have been created")
assert.Equal(t, (*v1.Pod)(nil), createdPods["tg-test-group-1-app01-1"], "Pod should not have been created")
assert.Equal(t, (*v1.Pod)(nil), createdPods["tg-test-group-1-app02-0"], "Pod should not have been created")
}
func createAndCheckPlaceholderCreate(mockedAPIProvider *client.MockedAPIProvider, app *Application, t *testing.T) map[string]*v1.Pod {
createdPods := make(map[string]*v1.Pod)
mockedAPIProvider.MockCreateFn(func(pod *v1.Pod) (*v1.Pod, error) {
createdPods[pod.Name] = pod
return pod, nil
})
placeholderMgr := NewPlaceholderManager(mockedAPIProvider.GetAPIs())
err := placeholderMgr.createAppPlaceholders(app)
assert.NilError(t, err, "create app placeholders should be successful")
assert.Equal(t, len(createdPods), 30)
return createdPods
}
func TestCreateAppPlaceholdersWithOwnReference(t *testing.T) {
app := createAppWIthTaskGroupForTest()
controller := true
ownRef := apis.OwnerReference{
Name: "JobId",
UID: "JobUid",
Controller: &controller,
}
app.setPlaceholderOwnerReferences([]apis.OwnerReference{ownRef})
mockedAPIProvider := client.NewMockedAPIProvider(false)
pods := createAndCheckPlaceholderCreate(mockedAPIProvider, app, t)
for _, pod := range pods {
assert.Assert(t, len(pod.OwnerReferences) == 1, "The pod should have exactly one owner reference set")
assert.Assert(t, *pod.OwnerReferences[0].Controller == false, "The owner reference should not be a controller")
assert.Equal(t, pod.OwnerReferences[0].Name, ownRef.Name, "The owner reference name does not match")
assert.Equal(t, pod.OwnerReferences[0].UID, ownRef.UID, "The owner reference UID does not match")
}
}
func createAppWIthTaskGroupForTest() *Application {
mockedSchedulerAPI := newMockSchedulerAPI()
app := NewApplication(appID, queue,
"bob", map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI)
app.setTaskGroups([]v1alpha1.TaskGroup{
{
Name: "test-group-1",
MinMember: 10,
MinResource: map[string]resource.Quantity{
"cpu": resource.MustParse("500m"),
"memory": resource.MustParse("1024M"),
},
},
{
Name: "test-group-2",
MinMember: 20,
MinResource: map[string]resource.Quantity{
"cpu": resource.MustParse("1000m"),
"memory": resource.MustParse("2048M"),
},
},
})
return app
}
func createAppWIthTaskGroupAndPodsForTest() *Application {
app := createAppWIthTaskGroupForTest()
mockedContext := initContextForTest()
pod1 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "tg-test-group-1-app01-0",
UID: "UID-01",
},
}
pod2 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "tg-test-group-1-app01-1",
UID: "UID-02",
},
}
pod3 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "tg-test-group-2-app01-0",
UID: "UID-03",
},
}
taskID1 := "task1-01"
task1 := NewTask(taskID1, app, mockedContext, pod1)
task1.placeholder = true
task1.pod = pod1
app.taskMap[taskID1] = task1
taskID2 := "task1-02"
task2 := NewTask(taskID2, app, mockedContext, pod2)
task2.placeholder = true
task2.pod = pod2
app.taskMap[taskID2] = task2
taskID3 := "task2-01"
task3 := NewTask(taskID3, app, mockedContext, pod3)
task3.placeholder = true
task3.pod = pod3
app.taskMap[taskID3] = task3
return app
}
func TestCleanUp(t *testing.T) {
mockedContext := initContextForTest()
mockedSchedulerAPI := newMockSchedulerAPI()
app := NewApplication(appID, queue,
"bob", map[string]string{constants.AppTagNamespace: namespace}, mockedSchedulerAPI)
mockedContext.applications[appID] = app
res := app.getNonTerminatedTaskAlias()
assert.Equal(t, len(res), 0)
pod1 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "pod-01",
UID: "UID-01",
},
}
pod2 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "pod-02",
UID: "UID-02",
},
}
pod3 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "pod-03",
UID: "UID-03",
},
}
taskID1 := "task01"
task1 := NewTask(taskID1, app, mockedContext, pod1)
task1.placeholder = true
app.taskMap[taskID1] = task1
taskID2 := "task02"
task2 := NewTask(taskID2, app, mockedContext, pod2)
task2.placeholder = true
app.taskMap[taskID2] = task2
taskID3 := "task03"
task3 := NewTask(taskID3, app, mockedContext, pod3)
task3.placeholder = false
app.taskMap[taskID3] = task3
res = app.getNonTerminatedTaskAlias()
assert.Equal(t, len(res), 3)
deletePod := make([]string, 0)
mockedAPIProvider := client.NewMockedAPIProvider(false)
mockedAPIProvider.MockDeleteFn(func(pod *v1.Pod) error {
deletePod = append(deletePod, pod.Name)
return nil
})
placeholderMgr := NewPlaceholderManager(mockedAPIProvider.GetAPIs())
placeholderMgr.cleanUp(app)
// check both pod-01 and pod-02 in deletePod list and pod-03 isn't contain
assert.Assert(t, is.Contains(deletePod, "pod-01"))
assert.Assert(t, is.Contains(deletePod, "pod-02"))
exist := false
for _, item := range deletePod {
if item == "pod-03" {
exist = true
}
}
assert.Equal(t, exist, false)
assert.Equal(t, len(placeholderMgr.orphanPods), 0)
}
func TestCleanOrphanPlaceholders(t *testing.T) {
mockedAPIProvider := client.NewMockedAPIProvider(false)
placeholderMgr := NewPlaceholderManager(mockedAPIProvider.GetAPIs())
pod1 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "pod-01",
UID: "UID-01",
},
}
placeholderMgr.orphanPods["task01"] = pod1
assert.Equal(t, len(placeholderMgr.orphanPods), 1)
placeholderMgr.cleanOrphanPlaceholders()
assert.Equal(t, len(placeholderMgr.orphanPods), 0)
}
func TestPlaceholderManagerStartStop(t *testing.T) {
mockedAPIProvider := client.NewMockedAPIProvider(false)
mgr := NewPlaceholderManager(mockedAPIProvider.GetAPIs())
assert.Equal(t, mgr.running.Load(), false, "new manager should not run")
// start clean up goroutine
mgr.Start()
assert.Equal(t, mgr.isRunning(), true, "manager should be running after start")
// starting a second time should do nothing
mgr.Start()
assert.Equal(t, mgr.isRunning(), true, "sending start 2nd time should not do anything")
// this is a blocking call
mgr.Stop()
// allow time for processing as the call can return faster than the flag is set
time.Sleep(5 * time.Millisecond)
// check manager has stopped
assert.Equal(t, mgr.isRunning(), false, "placeholder manager has not stopped")
// this should not block anymore, flag should be set
mgr.Stop()
assert.Equal(t, mgr.isRunning(), false, "stopping already stopped manager failed")
// make sure stop doesn't do anything on a non running manager
mgr = NewPlaceholderManager(mockedAPIProvider.GetAPIs())
assert.Equal(t, mgr.running.Load(), false, "new manager should not run")
mgr.Stop()
assert.Equal(t, mgr.isRunning(), false, "stopping new manager: nothing should happen")
mgr.Start()
assert.Equal(t, mgr.isRunning(), true, "manager should be running after start (stop start sequence)")
// lets stop it again now things should stop correctly
mgr.Stop()
// allow time for processing as the call can return faster than the flag is set
time.Sleep(5 * time.Millisecond)
assert.Equal(t, mgr.isRunning(), false, "placeholder manager has not stopped")
}
func TestPlaceholderManagerCleanup(t *testing.T) {
mockedAPIProvider := client.NewMockedAPIProvider(false)
pod1 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "pod-01",
UID: "UID-01",
},
}
pod2 := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "pod-02",
UID: "UID-02",
},
}
mgr := NewPlaceholderManager(mockedAPIProvider.GetAPIs())
mgr.setCleanupTime(100 * time.Millisecond)
defer mgr.setCleanupTime(5 * time.Second)
mgr.Start()
assert.Equal(t, mgr.isRunning(), true, "manager should be running after start")
mgr.orphanPods["task01"] = pod1
mgr.orphanPods["task02"] = pod2
assert.Equal(t, mgr.getOrphanPodsLength(), 2)
<-time.After(100 * time.Millisecond)
time.Sleep(5 * time.Millisecond)
assert.Equal(t, mgr.getOrphanPodsLength(), 0)
mgr.Stop()
time.Sleep(5 * time.Millisecond)
assert.Equal(t, mgr.isRunning(), false, "placeholder manager has stopped")
}