| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package cache |
| |
| import ( |
| "fmt" |
| "sort" |
| "strings" |
| "sync" |
| "testing" |
| "time" |
| |
| "github.com/apache/yunikorn-k8shim/pkg/dispatcher" |
| |
| "gotest.tools/assert" |
| is "gotest.tools/assert/cmp" |
| v1 "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/api/resource" |
| apis "k8s.io/apimachinery/pkg/apis/meta/v1" |
| k8sEvents "k8s.io/client-go/tools/events" |
| |
| "github.com/apache/yunikorn-k8shim/pkg/apis/yunikorn.apache.org/v1alpha1" |
| "github.com/apache/yunikorn-k8shim/pkg/appmgmt/general" |
| "github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces" |
| "github.com/apache/yunikorn-k8shim/pkg/client" |
| "github.com/apache/yunikorn-k8shim/pkg/common" |
| "github.com/apache/yunikorn-k8shim/pkg/common/constants" |
| "github.com/apache/yunikorn-k8shim/pkg/common/events" |
| "github.com/apache/yunikorn-k8shim/pkg/common/utils" |
| "github.com/apache/yunikorn-k8shim/pkg/conf" |
| "github.com/apache/yunikorn-scheduler-interface/lib/go/api" |
| siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" |
| "github.com/apache/yunikorn-scheduler-interface/lib/go/si" |
| ) |
| |
| type recorderTime struct { |
| time int64 |
| lock *sync.RWMutex |
| } |
| |
| func TestNewApplication(t *testing.T) { |
| app := NewApplication("app00001", "root.queue", "testuser", map[string]string{}, newMockSchedulerAPI()) |
| assert.Equal(t, app.GetApplicationID(), "app00001") |
| assert.Equal(t, app.GetApplicationState(), ApplicationStates().New) |
| assert.Equal(t, app.partition, constants.DefaultPartition) |
| assert.Equal(t, len(app.taskMap), 0) |
| assert.Equal(t, app.GetApplicationState(), ApplicationStates().New) |
| assert.Equal(t, app.queue, "root.queue") |
| } |
| |
| func TestSubmitApplication(t *testing.T) { |
| app := NewApplication("app00001", "root.abc", "testuser", map[string]string{}, newMockSchedulerAPI()) |
| err := app.handle(NewSubmitApplicationEvent(app.applicationID)) |
| assert.NilError(t, err) |
| assertAppState(t, app, ApplicationStates().Submitted, 10*time.Second) |
| |
| // app already submitted |
| err = app.handle(NewSubmitApplicationEvent(app.applicationID)) |
| if err == nil { |
| // this should give an error |
| t.Error("expecting error got 'nil'") |
| } |
| assertAppState(t, app, ApplicationStates().Submitted, 10*time.Second) |
| } |
| |
| func TestRunApplication(t *testing.T) { |
| ms := &mockSchedulerAPI{} |
| ms.UpdateApplicationFn = func(request *si.ApplicationRequest) error { |
| assert.Equal(t, len(request.New), 1) |
| assert.Equal(t, request.New[0].ApplicationID, "app00001") |
| assert.Equal(t, request.New[0].QueueName, "root.abc") |
| return nil |
| } |
| |
| app := NewApplication("app00001", "root.abc", "testuser", map[string]string{}, ms) |
| |
| // app must be submitted before being able to run |
| err := app.handle(NewRunApplicationEvent(app.applicationID)) |
| if err == nil { |
| // this should give an error |
| t.Error("expecting error got 'nil'") |
| } |
| assertAppState(t, app, ApplicationStates().New, 3*time.Second) |
| |
| // submit the app |
| err = app.handle(NewSubmitApplicationEvent(app.applicationID)) |
| assert.NilError(t, err) |
| assertAppState(t, app, ApplicationStates().Submitted, 3*time.Second) |
| |
| // app must be accepted first |
| err = app.handle(NewRunApplicationEvent(app.applicationID)) |
| if err == nil { |
| // this should give an error |
| t.Error("expecting error got 'nil'") |
| } |
| assertAppState(t, app, ApplicationStates().Submitted, 3*time.Second) |
| } |
| |
| func TestFailApplication(t *testing.T) { |
| context := initContextForTest() |
| dispatcher.RegisterEventHandler(dispatcher.EventTypeApp, context.ApplicationEventHandler()) |
| dispatcher.Start() |
| defer dispatcher.Stop() |
| |
| // inject the mocked clients to the placeholder manager |
| createdPods := newThreadSafePodsMap() |
| mockedAPIProvider := client.NewMockedAPIProvider(false) |
| mockedAPIProvider.MockCreateFn(func(pod *v1.Pod) (*v1.Pod, error) { |
| createdPods.add(pod) |
| return pod, nil |
| }) |
| mgr := NewPlaceholderManager(mockedAPIProvider.GetAPIs()) |
| mgr.Start() |
| defer mgr.Stop() |
| |
| rt := &recorderTime{ |
| time: int64(0), |
| lock: &sync.RWMutex{}, |
| } |
| ms := &mockSchedulerAPI{} |
| // set test mode |
| conf.GetSchedulerConf().SetTestMode(true) |
| // set Recorder to mocked type |
| mr := events.NewMockedRecorder() |
| mr.OnEventf = func() { |
| rt.lock.Lock() |
| defer rt.lock.Unlock() |
| rt.time++ |
| } |
| events.SetRecorder(mr) |
| resources := make(map[v1.ResourceName]resource.Quantity) |
| containers := make([]v1.Container, 0) |
| containers = append(containers, v1.Container{ |
| Name: "container-01", |
| Resources: v1.ResourceRequirements{ |
| Requests: resources, |
| }, |
| }) |
| pod := &v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "pod-test-00001", |
| UID: "UID-00001", |
| }, |
| Spec: v1.PodSpec{ |
| Containers: containers, |
| }, |
| } |
| appID := "app-test-001" |
| app := NewApplication(appID, "root.abc", "testuser", map[string]string{}, ms) |
| task1 := NewTask("task01", app, context, pod) |
| task2 := NewTask("task02", app, context, pod) |
| task3 := NewTask("task03", app, context, pod) |
| task4 := NewTask("task04", app, context, pod) |
| // set task states to new/pending/scheduling/running |
| task1.sm.SetState(TaskStates().New) |
| task2.sm.SetState(TaskStates().Pending) |
| task3.sm.SetState(TaskStates().Scheduling) |
| task4.sm.SetState(TaskStates().Allocated) |
| app.addTask(task1) |
| app.addTask(task2) |
| app.addTask(task3) |
| app.addTask(task4) |
| app.SetState(ApplicationStates().Accepted) |
| errMess := "Test Error Message" |
| err := app.handle(NewFailApplicationEvent(app.applicationID, errMess)) |
| assert.NilError(t, err) |
| assertAppState(t, app, ApplicationStates().Failing, 3*time.Second) |
| err = app.handle(NewFailApplicationEvent(app.applicationID, errMess)) |
| assert.NilError(t, err) |
| assertAppState(t, app, ApplicationStates().Failed, 3*time.Second) |
| assert.Equal(t, rt.time, int64(6)) |
| // reset time to 0 |
| rt.time = 0 |
| appID2 := "app-test-002" |
| app2 := NewApplication(appID2, "root.abc", "testuser", map[string]string{}, ms) |
| app2.SetState(ApplicationStates().New) |
| err = app2.handle(NewFailApplicationEvent(app2.applicationID, errMess)) |
| if err == nil { |
| t.Error("expecting error got 'nil'") |
| } |
| assertAppState(t, app2, ApplicationStates().New, 3*time.Second) |
| app2.SetState(ApplicationStates().Submitted) |
| err = app2.handle(NewFailApplicationEvent(app2.applicationID, errMess)) |
| assert.NilError(t, err) |
| assertAppState(t, app2, ApplicationStates().Failing, 3*time.Second) |
| err = app2.handle(NewFailApplicationEvent(app2.applicationID, errMess)) |
| assert.NilError(t, err) |
| assertAppState(t, app2, ApplicationStates().Failed, 3*time.Second) |
| assert.Equal(t, rt.time, int64(0)) |
| // Test over, set Recorder back fake type |
| events.SetRecorder(k8sEvents.NewFakeRecorder(1024)) |
| } |
| |
| func TestSetUnallocatedPodsToFailedWhenFailApplication(t *testing.T) { |
| context := initContextForTest() |
| dispatcher.RegisterEventHandler(dispatcher.EventTypeApp, context.ApplicationEventHandler()) |
| dispatcher.Start() |
| defer dispatcher.Stop() |
| |
| // inject the mocked clients to the placeholder manager |
| createdPods := newThreadSafePodsMap() |
| mockedAPIProvider := client.NewMockedAPIProvider(false) |
| mockedAPIProvider.MockCreateFn(func(pod *v1.Pod) (*v1.Pod, error) { |
| createdPods.add(pod) |
| return pod, nil |
| }) |
| mgr := NewPlaceholderManager(mockedAPIProvider.GetAPIs()) |
| mgr.Start() |
| defer mgr.Stop() |
| |
| mockClient := mockedAPIProvider.GetAPIs().KubeClient |
| context.apiProvider.GetAPIs().KubeClient = mockClient |
| |
| ms := &mockSchedulerAPI{} |
| // set test mode |
| conf.GetSchedulerConf().SetTestMode(true) |
| // set Recorder to mocked type |
| mr := events.NewMockedRecorder() |
| events.SetRecorder(mr) |
| resources := make(map[v1.ResourceName]resource.Quantity) |
| containers := make([]v1.Container, 0) |
| containers = append(containers, v1.Container{ |
| Name: "container-01", |
| Resources: v1.ResourceRequirements{ |
| Requests: resources, |
| }, |
| }) |
| pod1, err := mockClient.Create(&v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "pod-test-00001", |
| UID: "UID-00001", |
| }, |
| Spec: v1.PodSpec{ |
| Containers: containers, |
| }, |
| }) |
| assert.NilError(t, err) |
| pod2, err := mockClient.Create(&v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "pod-test-00002", |
| UID: "UID-00002", |
| }, |
| Spec: v1.PodSpec{ |
| Containers: containers, |
| }, |
| }) |
| assert.NilError(t, err) |
| pod3, err := mockClient.Create(&v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "pod-test-00003", |
| UID: "UID-00003", |
| }, |
| Spec: v1.PodSpec{ |
| Containers: containers, |
| }, |
| }) |
| assert.NilError(t, err) |
| appID := "app-test-001" |
| app := NewApplication(appID, "root.abc", "testuser", map[string]string{}, ms) |
| task1 := NewTask("task01", app, context, pod1) |
| task2 := NewTaskPlaceholder("task02", app, context, pod2) |
| task3 := NewTask("task03", app, context, pod3) |
| task1.sm.SetState(TaskStates().Pending) |
| task2.sm.SetState(TaskStates().Scheduling) |
| task3.sm.SetState(TaskStates().Scheduling) |
| app.addTask(task1) |
| app.addTask(task2) |
| app.addTask(task3) |
| app.SetState(ApplicationStates().Accepted) |
| errMess := constants.ApplicationInsufficientResourcesFailure |
| err = app.handle(NewFailApplicationEvent(app.applicationID, errMess)) |
| assert.NilError(t, err) |
| assertAppState(t, app, ApplicationStates().Failing, 3*time.Second) |
| err = app.handle(NewFailApplicationEvent(app.applicationID, errMess)) |
| assert.NilError(t, err) |
| assertAppState(t, app, ApplicationStates().Failed, 3*time.Second) |
| |
| // Note that the status of pod 2, a placeholder pod, doesn't matter because it will be cleaned up |
| newPod1, err := mockClient.Get(pod1.Namespace, pod1.Name) |
| assert.NilError(t, err) |
| assert.Equal(t, newPod1.Status.Phase, v1.PodFailed, 3*time.Second) |
| assert.Equal(t, newPod1.Status.Reason, constants.ApplicationInsufficientResourcesFailure, 3*time.Second) |
| newPod3, err := mockClient.Get(pod3.Namespace, pod3.Name) |
| assert.NilError(t, err) |
| assert.Equal(t, newPod3.Status.Phase, v1.PodFailed, 3*time.Second) |
| assert.Equal(t, newPod3.Status.Reason, constants.ApplicationInsufficientResourcesFailure, 3*time.Second) |
| // Test over, set Recorder back fake type |
| events.SetRecorder(k8sEvents.NewFakeRecorder(1024)) |
| } |
| |
| func TestSetUnallocatedPodsToFailedWhenRejectApplication(t *testing.T) { |
| context := initContextForTest() |
| dispatcher.RegisterEventHandler(dispatcher.EventTypeApp, context.ApplicationEventHandler()) |
| dispatcher.Start() |
| defer dispatcher.Stop() |
| |
| // inject the mocked clients to the placeholder manager |
| createdPods := newThreadSafePodsMap() |
| mockedAPIProvider := client.NewMockedAPIProvider(false) |
| mockedAPIProvider.MockCreateFn(func(pod *v1.Pod) (*v1.Pod, error) { |
| createdPods.add(pod) |
| return pod, nil |
| }) |
| |
| mockClient := mockedAPIProvider.GetAPIs().KubeClient |
| context.apiProvider.GetAPIs().KubeClient = mockClient |
| mgr := NewPlaceholderManager(mockedAPIProvider.GetAPIs()) |
| mgr.Start() |
| defer mgr.Stop() |
| |
| ms := &mockSchedulerAPI{} |
| // set test mode |
| conf.GetSchedulerConf().SetTestMode(true) |
| // set Recorder to mocked type |
| mr := events.NewMockedRecorder() |
| events.SetRecorder(mr) |
| resources := make(map[v1.ResourceName]resource.Quantity) |
| containers := make([]v1.Container, 0) |
| containers = append(containers, v1.Container{ |
| Name: "container-01", |
| Resources: v1.ResourceRequirements{ |
| Requests: resources, |
| }, |
| }) |
| pod1, err := mockClient.Create(&v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "pod-test-00001", |
| UID: "UID-00001", |
| }, |
| Spec: v1.PodSpec{ |
| Containers: containers, |
| }, |
| }) |
| assert.NilError(t, err) |
| pod2, err := mockClient.Create(&v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "pod-test-00002", |
| UID: "UID-00002", |
| }, |
| Spec: v1.PodSpec{ |
| Containers: containers, |
| }, |
| }) |
| assert.NilError(t, err) |
| appID := "app-test-001" |
| app := NewApplication(appID, "root.abc", "testuser", map[string]string{}, ms) |
| task1 := NewTask("task01", app, context, pod1) |
| task2 := NewTask("task02", app, context, pod2) |
| task1.sm.SetState(TaskStates().Pending) |
| task2.sm.SetState(TaskStates().Pending) |
| app.addTask(task1) |
| app.addTask(task2) |
| app.SetState(ApplicationStates().Submitted) |
| context.AddApplication(&interfaces.AddApplicationRequest{ |
| Metadata: interfaces.ApplicationMetadata{ |
| ApplicationID: app.applicationID, |
| QueueName: app.queue, |
| User: app.user, |
| Tags: app.tags, |
| }, |
| }) |
| errMess := "app rejected" |
| err = app.handle(NewApplicationEvent(app.applicationID, RejectApplication, errMess)) |
| assert.NilError(t, err) |
| assertAppState(t, app, ApplicationStates().Rejected, 3*time.Second) |
| |
| err = app.handle(NewFailApplicationEvent(app.applicationID, |
| fmt.Sprintf("%s: %s", constants.ApplicationRejectedFailure, errMess))) |
| assert.NilError(t, err) |
| assertAppState(t, app, ApplicationStates().Failed, 3*time.Second) |
| |
| newPod1, err := mockClient.Get(pod1.Namespace, pod1.Name) |
| assert.NilError(t, err) |
| assert.Equal(t, newPod1.Status.Phase, v1.PodFailed, 3*time.Second) |
| assert.Equal(t, newPod1.Status.Reason, constants.ApplicationRejectedFailure, 3*time.Second) |
| newPod2, err := mockClient.Get(pod2.Namespace, pod2.Name) |
| assert.NilError(t, err) |
| assert.Equal(t, newPod2.Status.Phase, v1.PodFailed, 3*time.Second) |
| assert.Equal(t, newPod2.Status.Reason, constants.ApplicationRejectedFailure, 3*time.Second) |
| // Test over, set Recorder back fake type |
| events.SetRecorder(k8sEvents.NewFakeRecorder(1024)) |
| } |
| |
| func TestReleaseAppAllocation(t *testing.T) { |
| context := initContextForTest() |
| ms := &mockSchedulerAPI{} |
| resources := make(map[v1.ResourceName]resource.Quantity) |
| containers := make([]v1.Container, 0) |
| containers = append(containers, v1.Container{ |
| Name: "container-01", |
| Resources: v1.ResourceRequirements{ |
| Requests: resources, |
| }, |
| }) |
| pod := &v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "pod-test-00001", |
| UID: "UID-00001", |
| }, |
| Spec: v1.PodSpec{ |
| Containers: containers, |
| }, |
| } |
| appID := "app-test-001" |
| UUID := "testUUID001" |
| app := NewApplication(appID, "root.abc", "testuser", map[string]string{}, ms) |
| task := NewTask("task01", app, context, pod) |
| app.addTask(task) |
| task.allocationUUID = UUID |
| // app must be running states |
| err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, UUID)) |
| if err == nil { |
| // this should give an error |
| t.Error("expecting error got 'nil'") |
| } |
| // set app states to running, let event can be trigger |
| app.SetState(ApplicationStates().Running) |
| assertAppState(t, app, ApplicationStates().Running, 3*time.Second) |
| err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, UUID)) |
| assert.NilError(t, err) |
| // after handle release event the states of app must be running |
| assertAppState(t, app, ApplicationStates().Running, 3*time.Second) |
| } |
| |
| func newMockSchedulerAPI() *mockSchedulerAPI { |
| return &mockSchedulerAPI{ |
| registerFn: func(request *si.RegisterResourceManagerRequest, callback api.ResourceManagerCallback) (response *si.RegisterResourceManagerResponse, e error) { |
| return nil, nil |
| }, |
| UpdateAllocationFn: func(request *si.AllocationRequest) error { |
| return nil |
| }, |
| UpdateApplicationFn: func(request *si.ApplicationRequest) error { |
| return nil |
| }, |
| UpdateNodeFn: func(request *si.NodeRequest) error { |
| return nil |
| }, |
| UpdateConfigurationFn: func(clusterID string) error { |
| return nil |
| }, |
| } |
| } |
| |
| type mockSchedulerAPI struct { |
| callback api.ResourceManagerCallback //nolint:structcheck,unused |
| registerFn func(request *si.RegisterResourceManagerRequest, |
| callback api.ResourceManagerCallback) (*si.RegisterResourceManagerResponse, error) |
| UpdateAllocationFn func(request *si.AllocationRequest) error |
| UpdateApplicationFn func(request *si.ApplicationRequest) error |
| UpdateNodeFn func(request *si.NodeRequest) error |
| UpdateConfigurationFn func(clusterID string) error |
| } |
| |
| func (ms *mockSchedulerAPI) RegisterResourceManager(request *si.RegisterResourceManagerRequest, |
| callback api.ResourceManagerCallback) (*si.RegisterResourceManagerResponse, error) { |
| return ms.registerFn(request, callback) |
| } |
| |
| func (ms *mockSchedulerAPI) UpdateAllocation(request *si.AllocationRequest) error { |
| return ms.UpdateAllocationFn(request) |
| } |
| |
| func (ms *mockSchedulerAPI) UpdateApplication(request *si.ApplicationRequest) error { |
| return ms.UpdateApplicationFn(request) |
| } |
| |
| func (ms *mockSchedulerAPI) UpdateNode(request *si.NodeRequest) error { |
| return ms.UpdateNodeFn(request) |
| } |
| |
| func (ms *mockSchedulerAPI) UpdateConfiguration(clusterID string) error { |
| return nil |
| } |
| |
| func assertAppState(t *testing.T, app *Application, expectedState string, duration time.Duration) { |
| deadline := time.Now().Add(duration) |
| for { |
| if app.sm.Current() == expectedState { |
| return |
| } |
| |
| if time.Now().After(deadline) { |
| t.Fatalf("timeout waiting for app %s reach to state %s", app.applicationID, expectedState) |
| } |
| } |
| } |
| |
| func TestGetNonTerminatedTaskAlias(t *testing.T) { |
| context := initContextForTest() |
| appID := "app00001" |
| app := NewApplication(appID, "root.a", "testuser", map[string]string{}, newMockSchedulerAPI()) |
| context.applications[appID] = app |
| // app doesn't have any task |
| res := app.getNonTerminatedTaskAlias() |
| assert.Equal(t, len(res), 0) |
| |
| pod1 := &v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "test-00001", |
| UID: "UID-00001", |
| }, |
| } |
| pod2 := &v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "test-00002", |
| UID: "UID-00002", |
| }, |
| } |
| // set two task to non-terminated states |
| taskID1 := "task01" |
| task1 := NewTask(taskID1, app, context, pod1) |
| app.taskMap[taskID1] = task1 |
| task1.sm.SetState(TaskStates().Pending) |
| taskID2 := "task02" |
| task2 := NewTask(taskID2, app, context, pod2) |
| app.taskMap[taskID2] = task2 |
| task2.sm.SetState(TaskStates().Pending) |
| // check the tasks both in non-terminated states |
| // res should return both task's alias |
| res = app.getNonTerminatedTaskAlias() |
| assert.Equal(t, len(res), 2) |
| assert.Assert(t, is.Contains(res, "/test-00001")) |
| assert.Assert(t, is.Contains(res, "/test-00002")) |
| |
| // set two tasks to terminated states |
| task1.sm.SetState(TaskStates().Rejected) |
| task2.sm.SetState(TaskStates().Rejected) |
| // check the tasks both in terminated states |
| // res should retuen empty |
| res = app.getNonTerminatedTaskAlias() |
| assert.Equal(t, len(res), 0) |
| |
| // set two tasks to one is terminated, another is non-terminated |
| task1.sm.SetState(TaskStates().Rejected) |
| task2.sm.SetState(TaskStates().Allocated) |
| // check the task, should only return task2's alias |
| res = app.getNonTerminatedTaskAlias() |
| assert.Equal(t, len(res), 1) |
| assert.Equal(t, res[0], "/test-00002") |
| } |
| |
| func TestSetTaskGroupsAndSchedulingPolicy(t *testing.T) { |
| app := NewApplication("app01", "root.a", "test-user", map[string]string{}, newMockSchedulerAPI()) |
| assert.Assert(t, app.getSchedulingPolicy().Type == "") |
| assert.Equal(t, len(app.getTaskGroups()), 0) |
| |
| app.setSchedulingPolicy(v1alpha1.SchedulingPolicy{ |
| Type: v1alpha1.TryReserve, |
| Parameters: map[string]string{ |
| "option-1": "value-1", |
| "option-2": "value-2", |
| }, |
| }) |
| |
| assert.Equal(t, app.getSchedulingPolicy().Type, v1alpha1.TryReserve) |
| assert.Equal(t, len(app.getSchedulingPolicy().Parameters), 2) |
| assert.Equal(t, app.getSchedulingPolicy().Parameters["option-1"], "value-1", "incorrect parameter value") |
| assert.Equal(t, app.getSchedulingPolicy().Parameters["option-2"], "value-2", "incorrect parameter value") |
| |
| duration := int64(3000) |
| app.setTaskGroups([]v1alpha1.TaskGroup{ |
| { |
| Name: "test-group-1", |
| MinMember: 10, |
| MinResource: map[string]resource.Quantity{ |
| v1.ResourceCPU.String(): resource.MustParse("500m"), |
| v1.ResourceMemory.String(): resource.MustParse("500Mi"), |
| }, |
| }, |
| { |
| Name: "test-group-2", |
| MinMember: 20, |
| MinResource: map[string]resource.Quantity{ |
| v1.ResourceCPU.String(): resource.MustParse("1000m"), |
| v1.ResourceMemory.String(): resource.MustParse("1000Mi"), |
| }, |
| NodeSelector: map[string]string{ |
| "locate": "west", |
| }, |
| Tolerations: []v1.Toleration{ |
| { |
| Key: "nodeType", |
| Operator: v1.TolerationOpEqual, |
| Value: "infra", |
| Effect: v1.TaintEffectNoSchedule, |
| TolerationSeconds: &duration, |
| }, |
| }, |
| }, |
| }) |
| |
| assert.Assert(t, app.getTaskGroups() != nil) |
| assert.Equal(t, len(app.getTaskGroups()), 2) |
| |
| // sort the slice to give us a stable order |
| sort.Slice(app.getTaskGroups(), func(i, j int) bool { |
| return strings.Compare(app.getTaskGroups()[i].Name, app.getTaskGroups()[j].Name) < 0 |
| }) |
| |
| tg1 := app.getTaskGroups()[0] |
| assert.Equal(t, tg1.Name, "test-group-1") |
| assert.Equal(t, tg1.MinMember, int32(10)) |
| assert.Equal(t, tg1.MinResource[v1.ResourceCPU.String()], resource.MustParse("500m")) |
| assert.Equal(t, tg1.MinResource[v1.ResourceMemory.String()], resource.MustParse("500Mi")) |
| |
| tg2 := app.getTaskGroups()[1] |
| assert.Equal(t, tg2.Name, "test-group-2") |
| assert.Equal(t, tg2.MinMember, int32(20)) |
| assert.Equal(t, len(tg2.Tolerations), 1) |
| assert.Equal(t, tg2.Tolerations[0].Key, "nodeType") |
| assert.Equal(t, tg2.Tolerations[0].Value, "infra") |
| assert.Equal(t, tg2.Tolerations[0].Operator, v1.TolerationOpEqual) |
| assert.Equal(t, tg2.Tolerations[0].Effect, v1.TaintEffectNoSchedule) |
| assert.Equal(t, tg2.Tolerations[0].TolerationSeconds, &duration) |
| |
| // TG1: 500Mi, 10 members -> total 5000Mi |
| // TG2: 1000Mi, 20 members -> total 20000Mi |
| // overall usage 5000Mi + 20000Mi = 25000Mi. This will also be the queue usage so the correct handling |
| // CPU is normal as it specifies milli cpu to start with |
| expectedPlaceholderAsk := common.NewResourceBuilder().AddResource(siCommon.Memory, 25000*1024*1024).AddResource(siCommon.CPU, 25000).Build() |
| actualPlaceholderAsk := app.getPlaceholderAsk() |
| assert.DeepEqual(t, actualPlaceholderAsk, expectedPlaceholderAsk) |
| } |
| |
| type threadSafePodsMap struct { |
| pods map[string]*v1.Pod |
| sync.RWMutex |
| } |
| |
| func newThreadSafePodsMap() *threadSafePodsMap { |
| return &threadSafePodsMap{ |
| pods: make(map[string]*v1.Pod), |
| } |
| } |
| |
| func (t *threadSafePodsMap) add(pod *v1.Pod) { |
| t.Lock() |
| defer t.Unlock() |
| t.pods[pod.Name] = pod |
| } |
| |
| func (t *threadSafePodsMap) count() int { |
| t.RLock() |
| defer t.RUnlock() |
| return len(t.pods) |
| } |
| |
| func TestTryReserve(t *testing.T) { |
| context := initContextForTest() |
| dispatcher.RegisterEventHandler(dispatcher.EventTypeApp, context.ApplicationEventHandler()) |
| dispatcher.Start() |
| defer dispatcher.Stop() |
| |
| // inject the mocked clients to the placeholder manager |
| createdPods := newThreadSafePodsMap() |
| mockedAPIProvider := client.NewMockedAPIProvider(false) |
| mockedAPIProvider.MockCreateFn(func(pod *v1.Pod) (*v1.Pod, error) { |
| createdPods.add(pod) |
| return pod, nil |
| }) |
| mgr := NewPlaceholderManager(mockedAPIProvider.GetAPIs()) |
| mgr.Start() |
| defer mgr.Stop() |
| |
| // create a new app |
| app := NewApplication("app00001", "root.abc", "test-user", |
| map[string]string{}, mockedAPIProvider.GetAPIs().SchedulerAPI) |
| context.applications[app.applicationID] = app |
| |
| // set app scheduling policy |
| app.setSchedulingPolicy(v1alpha1.SchedulingPolicy{ |
| Type: v1alpha1.TryReserve, |
| Parameters: map[string]string{ |
| "option-1": "value-1", |
| "option-2": "value-2", |
| }, |
| }) |
| |
| // set taskGroups |
| app.setTaskGroups([]v1alpha1.TaskGroup{ |
| { |
| Name: "test-group-1", |
| MinMember: 10, |
| MinResource: map[string]resource.Quantity{ |
| v1.ResourceCPU.String(): resource.MustParse("500m"), |
| v1.ResourceMemory.String(): resource.MustParse("500Mi"), |
| }, |
| }, |
| { |
| Name: "test-group-2", |
| MinMember: 20, |
| MinResource: map[string]resource.Quantity{ |
| v1.ResourceCPU.String(): resource.MustParse("1000m"), |
| v1.ResourceMemory.String(): resource.MustParse("1000Mi"), |
| }, |
| }, |
| }) |
| |
| // submit the app |
| assert.Assert(t, app.sm != nil) |
| err := app.handle(NewSubmitApplicationEvent(app.applicationID)) |
| assert.NilError(t, err) |
| assertAppState(t, app, ApplicationStates().Submitted, 3*time.Second) |
| |
| // accepted the app |
| err = app.handle(NewSimpleApplicationEvent(app.GetApplicationID(), AcceptApplication)) |
| assert.NilError(t, err) |
| |
| // run app schedule |
| app.Schedule() |
| |
| // since this app has taskGroups defined, |
| // once the app is accepted, it is expected to see this app goes to Reserving state |
| assertAppState(t, app, ApplicationStates().Reserving, 3*time.Second) |
| |
| // under Reserving state, the app will need to acquire all the placeholders it asks for |
| err = utils.WaitForCondition(func() bool { |
| return createdPods.count() == 30 |
| }, 100*time.Millisecond, 3*time.Second) |
| assert.NilError(t, err, "placeholders are not created") |
| } |
| |
| func TestTryReservePostRestart(t *testing.T) { |
| context := initContextForTest() |
| dispatcher.RegisterEventHandler(dispatcher.EventTypeApp, context.ApplicationEventHandler()) |
| dispatcher.Start() |
| defer dispatcher.Stop() |
| |
| // inject the mocked clients to the placeholder manager |
| createdPods := newThreadSafePodsMap() |
| mockedAPIProvider := client.NewMockedAPIProvider(false) |
| mockedAPIProvider.MockCreateFn(func(pod *v1.Pod) (*v1.Pod, error) { |
| createdPods.add(pod) |
| return pod, nil |
| }) |
| mgr := NewPlaceholderManager(mockedAPIProvider.GetAPIs()) |
| mgr.Start() |
| defer mgr.Stop() |
| |
| // create a new app |
| app := NewApplication("app00001", "root.abc", "test-user", |
| map[string]string{}, mockedAPIProvider.GetAPIs().SchedulerAPI) |
| context.applications[app.applicationID] = app |
| |
| // set taskGroups |
| app.setTaskGroups([]v1alpha1.TaskGroup{ |
| { |
| Name: "test-group-1", |
| MinMember: 10, |
| MinResource: map[string]resource.Quantity{ |
| v1.ResourceCPU.String(): resource.MustParse("500m"), |
| v1.ResourceMemory.String(): resource.MustParse("500Mi"), |
| }, |
| }, |
| { |
| Name: "test-group-2", |
| MinMember: 20, |
| MinResource: map[string]resource.Quantity{ |
| v1.ResourceCPU.String(): resource.MustParse("1000m"), |
| v1.ResourceMemory.String(): resource.MustParse("1000Mi"), |
| }, |
| }, |
| }) |
| |
| // submit the app |
| err := app.handle(NewSubmitApplicationEvent(app.applicationID)) |
| assert.NilError(t, err) |
| assertAppState(t, app, ApplicationStates().Submitted, 3*time.Second) |
| |
| // accepted the app |
| err = app.handle(NewSimpleApplicationEvent(app.GetApplicationID(), AcceptApplication)) |
| assert.NilError(t, err) |
| |
| // simulate some tasks are recovered during the restart |
| // create 3 pods, 1 of them is Allocated and the other 2 are New |
| resources := make(map[v1.ResourceName]resource.Quantity) |
| containers := make([]v1.Container, 0) |
| containers = append(containers, v1.Container{ |
| Name: "container-01", |
| Resources: v1.ResourceRequirements{ |
| Requests: resources, |
| }, |
| }) |
| task0 := NewTask("task00", app, context, &v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "pod-test-00000", |
| UID: "UID-00000", |
| }, |
| Spec: v1.PodSpec{ |
| Containers: containers, |
| }, |
| }) |
| task1 := NewTask("task01", app, context, &v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "pod-test-00001", |
| UID: "UID-00001", |
| }, |
| Spec: v1.PodSpec{ |
| Containers: containers, |
| }, |
| }) |
| task2 := NewTask("task02", app, context, &v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "pod-test-00002", |
| UID: "UID-00002", |
| }, |
| Spec: v1.PodSpec{ |
| Containers: containers, |
| }, |
| }) |
| task0.setAllocated("fake-host", string(task0.pod.UID)) |
| app.addTask(task0) |
| app.addTask(task1) |
| app.addTask(task2) |
| |
| // there should be 1 Allocated task, i.e task0 |
| // there should be 2 New tasks, i.e task1 and task2 |
| assert.Equal(t, len(app.getTasks(TaskStates().Allocated)), 1) |
| assert.Equal(t, len(app.getTasks(TaskStates().New)), 2) |
| |
| // run app schedule |
| app.Schedule() |
| |
| // since this app has Allocated tasks, the Reserving state will be skipped |
| assertAppState(t, app, ApplicationStates().Running, 3*time.Second) |
| |
| // verify there will be no placeholders created |
| time.Sleep(time.Second) |
| assert.Equal(t, createdPods.count(), 0) |
| } |
| |
| func TestTriggerAppRecovery(t *testing.T) { |
| // Trigger app recovery should be successful if the app is in New state |
| mockScheduler := newMockSchedulerAPI() |
| var savedAppRequest *si.ApplicationRequest |
| mockScheduler.UpdateApplicationFn = func(request *si.ApplicationRequest) error { |
| savedAppRequest = request |
| return nil |
| } |
| |
| app := NewApplication("app00001", "root.abc", "test-user", |
| map[string]string{}, mockScheduler) |
| app.placeholderAsk = &si.Resource{ |
| Resources: map[string]*si.Quantity{ |
| "memory": {Value: 100}, |
| }, |
| } |
| app.placeholderTimeoutInSec = 1 |
| app.tags = map[string]string{ |
| "testkey": "testvalue", |
| } |
| app.schedulingStyle = "soft" |
| |
| err := app.TriggerAppRecovery() |
| assert.NilError(t, err) |
| assert.Equal(t, app.GetApplicationState(), ApplicationStates().Recovering) |
| assert.Assert(t, savedAppRequest != nil, "update function was not called") |
| assert.Equal(t, 1, len(savedAppRequest.New)) |
| appRequest := savedAppRequest.New[0] |
| assert.Assert(t, appRequest.PlaceholderAsk != nil, "PlaceholderAsk is not set") |
| assert.Equal(t, appRequest.PlaceholderAsk.Resources["memory"].Value, int64(100)) |
| assert.Equal(t, "app00001", appRequest.ApplicationID) |
| assert.Equal(t, appRequest.QueueName, "root.abc") |
| assert.Equal(t, appRequest.PartitionName, "default") |
| assert.Equal(t, appRequest.ExecutionTimeoutMilliSeconds, int64(1000)) |
| assert.Equal(t, appRequest.GangSchedulingStyle, "soft") |
| assert.Assert(t, appRequest.Tags != nil, "Tags are not set") |
| assert.Equal(t, appRequest.Tags["testkey"], "testvalue") |
| assert.Assert(t, appRequest.Ugi != nil, "Ugi is not set") |
| assert.Equal(t, appRequest.Ugi.User, "test-user") |
| |
| // Trigger app recovery should be failed if the app already leaves New state |
| app = NewApplication("app00001", "root.abc", "test-user", |
| map[string]string{}, newMockSchedulerAPI()) |
| err = app.handle(NewSubmitApplicationEvent(app.applicationID)) |
| assert.NilError(t, err) |
| assertAppState(t, app, ApplicationStates().Submitted, 3*time.Second) |
| err = app.TriggerAppRecovery() |
| assert.ErrorContains(t, err, "event RecoverApplication inappropriate in current state Submitted") |
| } |
| |
| func TestSkipReservationStage(t *testing.T) { |
| context := initContextForTest() |
| app := NewApplication("app00001", "root.queue", "test-user", map[string]string{}, newMockSchedulerAPI()) |
| app.addTask(NewTask("task0001", app, context, &v1.Pod{})) |
| skip := app.skipReservationStage() |
| assert.Equal(t, skip, true, "expected to skip reservation because there is no task groups defined") |
| |
| // app has task groups defined, and contains 2 tasks, 1 Pending and 1 Allocated |
| // expect: skip reservation |
| app = NewApplication("app00001", "root.queue", "test-user", map[string]string{}, newMockSchedulerAPI()) |
| task1 := NewTask("task0001", app, context, &v1.Pod{}) |
| task1.sm.SetState(TaskStates().New) |
| task2 := NewTask("task0002", app, context, &v1.Pod{}) |
| task2.sm.SetState(TaskStates().Allocated) |
| app.addTask(task1) |
| app.addTask(task2) |
| app.setTaskGroups([]v1alpha1.TaskGroup{ |
| { |
| Name: "test-group-1", |
| MinMember: 10, |
| MinResource: map[string]resource.Quantity{ |
| v1.ResourceCPU.String(): resource.MustParse("500m"), |
| v1.ResourceMemory.String(): resource.MustParse("500Mi"), |
| }, |
| }}, |
| ) |
| skip = app.skipReservationStage() |
| assert.Equal(t, skip, true, "expected to skip reservation because there is task in Allocated state") |
| |
| // app has task groups defined, and contains 2 tasks, both are New |
| // expect: do not skip reservation |
| app = NewApplication("app00001", "root.queue", "test-user", map[string]string{}, newMockSchedulerAPI()) |
| task1 = NewTask("task0001", app, context, &v1.Pod{}) |
| task1.sm.SetState(TaskStates().New) |
| task2 = NewTask("task0002", app, context, &v1.Pod{}) |
| task2.sm.SetState(TaskStates().New) |
| app.addTask(task1) |
| app.addTask(task2) |
| app.setTaskGroups([]v1alpha1.TaskGroup{ |
| { |
| Name: "test-group-1", |
| MinMember: 10, |
| MinResource: map[string]resource.Quantity{ |
| v1.ResourceCPU.String(): resource.MustParse("500m"), |
| v1.ResourceMemory.String(): resource.MustParse("500Mi"), |
| }, |
| }}, |
| ) |
| skip = app.skipReservationStage() |
| assert.Equal(t, skip, false, "expected not to skip reservation") |
| } |
| |
| func TestReleaseAppAllocationInFailingState(t *testing.T) { |
| context := initContextForTest() |
| ms := &mockSchedulerAPI{} |
| resources := make(map[v1.ResourceName]resource.Quantity) |
| containers := make([]v1.Container, 0) |
| containers = append(containers, v1.Container{ |
| Name: "container-01", |
| Resources: v1.ResourceRequirements{ |
| Requests: resources, |
| }, |
| }) |
| pod := &v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "pod-test-00001", |
| UID: "UID-00001", |
| }, |
| Spec: v1.PodSpec{ |
| Containers: containers, |
| }, |
| } |
| appID := "app-test-001" |
| UUID := "testUUID001" |
| app := NewApplication(appID, "root.abc", "testuser", map[string]string{}, ms) |
| task := NewTask("task01", app, context, pod) |
| app.addTask(task) |
| task.allocationUUID = UUID |
| // app must be running states |
| err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, UUID)) |
| if err == nil { |
| // this should give an error |
| t.Error("expecting error got 'nil'") |
| } |
| // set app states to running, let event can be trigger |
| app.SetState(ApplicationStates().Running) |
| assertAppState(t, app, ApplicationStates().Running, 3*time.Second) |
| err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, UUID)) |
| assert.NilError(t, err) |
| // after handle release event the states of app must be running |
| assertAppState(t, app, ApplicationStates().Running, 3*time.Second) |
| app.SetState(ApplicationStates().Failing) |
| err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, UUID)) |
| assert.NilError(t, err) |
| // after handle release event the states of app must be failing |
| assertAppState(t, app, ApplicationStates().Failing, 3*time.Second) |
| |
| errMess := "Test Error Message" |
| err = app.handle(NewFailApplicationEvent(app.applicationID, errMess)) |
| assert.NilError(t, err) |
| // after handle fail event the states of app must be failed |
| assertAppState(t, app, ApplicationStates().Failed, 3*time.Second) |
| } |
| |
| func TestResumingStateTransitions(t *testing.T) { |
| context := initContextForTest() |
| dispatcher.RegisterEventHandler(dispatcher.EventTypeApp, context.ApplicationEventHandler()) |
| dispatcher.Start() |
| defer dispatcher.Stop() |
| |
| // inject the mocked clients to the placeholder manager |
| createdPods := newThreadSafePodsMap() |
| mockedAPIProvider := client.NewMockedAPIProvider(false) |
| mockedAPIProvider.MockCreateFn(func(pod *v1.Pod) (*v1.Pod, error) { |
| createdPods.add(pod) |
| return pod, nil |
| }) |
| mgr := NewPlaceholderManager(mockedAPIProvider.GetAPIs()) |
| mgr.Start() |
| defer mgr.Stop() |
| |
| // create a new app |
| app := NewApplication("app00001", "root.abc", "test-user", |
| map[string]string{}, mockedAPIProvider.GetAPIs().SchedulerAPI) |
| task1 := NewTask("task0001", app, context, &v1.Pod{}) |
| task1.sm.SetState(TaskStates().New) |
| task2 := NewTask("task0002", app, context, &v1.Pod{}) |
| task2.sm.SetState(TaskStates().Allocated) |
| |
| // Add tasks |
| app.addTask(task1) |
| app.addTask(task2) |
| UUID := "testUUID001" |
| task1.allocationUUID = UUID |
| context.applications[app.applicationID] = app |
| |
| // Set app state to "reserving" |
| app.SetState(ApplicationStates().Reserving) |
| |
| // Fire ResumingApplicationEvent for state change from "reserving" to "resuming" |
| err := app.handle(NewResumingApplicationEvent(app.applicationID)) |
| assert.NilError(t, err) |
| assertAppState(t, app, ApplicationStates().Resuming, 3*time.Second) |
| |
| // Set 1st task status alone to "completed" |
| event1 := NewSimpleTaskEvent(app.applicationID, task1.taskID, CompleteTask) |
| err = task1.handle(event1) |
| assert.NilError(t, err, "failed to handle CompleteTask event") |
| assert.Equal(t, task1.GetTaskState(), TaskStates().Completed) |
| |
| // Still app state is "resuming" |
| assertAppState(t, app, ApplicationStates().Resuming, 3*time.Second) |
| |
| // Setting 2nd task status also to "completed". Now, app state changes from "resuming" to "running" |
| event2 := NewSimpleTaskEvent(app.applicationID, task2.taskID, CompleteTask) |
| err = task2.handle(event2) |
| assert.NilError(t, err, "failed to handle CompleteTask event") |
| assert.Equal(t, task2.GetTaskState(), TaskStates().Completed) |
| |
| err = app.handle(NewSimpleApplicationEvent(app.applicationID, AppTaskCompleted)) |
| assert.NilError(t, err) |
| assertAppState(t, app, ApplicationStates().Running, 3*time.Second) |
| } |
| |
| func TestGetPlaceholderTasks(t *testing.T) { |
| context := initContextForTest() |
| app := NewApplication(appID, "root.a", "testuser", map[string]string{}, newMockSchedulerAPI()) |
| task1 := NewTask("task0001", app, context, &v1.Pod{}) |
| task1.placeholder = true |
| task2 := NewTask("task0002", app, context, &v1.Pod{}) |
| task2.placeholder = true |
| task3 := NewTask("task0003", app, context, &v1.Pod{}) |
| |
| app.addTask(task1) |
| app.addTask(task2) |
| app.addTask(task3) |
| |
| phTasks := app.GetPlaceHolderTasks() |
| assert.Equal(t, 2, len(phTasks)) |
| phTasksMap := map[string]bool{ |
| phTasks[0].GetTaskID(): true, |
| phTasks[1].GetTaskID(): true, |
| } |
| |
| assert.Assert(t, phTasksMap["task0001"]) |
| assert.Assert(t, phTasksMap["task0002"]) |
| } |
| |
| func TestPlaceholderTimeoutEvents(t *testing.T) { |
| context := initContextForTest() |
| recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder) |
| if !ok { |
| t.Fatal("the EventRecorder is expected to be of type FakeRecorder") |
| } |
| |
| amprotocol := NewMockedAMProtocol() |
| podEvent := general.NewPodEventHandler(amprotocol, false) |
| |
| am := general.NewManager(client.NewMockedAPIProvider(false), podEvent) |
| pod1 := v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "pod00001", |
| Namespace: "default", |
| UID: "UID-POD-00001", |
| Labels: map[string]string{ |
| "queue": "root.a", |
| "applicationId": "app00001", |
| }, |
| }, |
| Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, |
| Status: v1.PodStatus{ |
| Phase: v1.PodPending, |
| }, |
| } |
| |
| // add a pending pod through the AM service |
| am.AddPod(&pod1) |
| |
| pod := &v1.Pod{ |
| TypeMeta: apis.TypeMeta{ |
| Kind: "Pod", |
| APIVersion: "v1", |
| }, |
| ObjectMeta: apis.ObjectMeta{ |
| Name: "pod00002", |
| Namespace: "default", |
| UID: "UID-POD-00002", |
| Labels: map[string]string{ |
| "queue": "root.a", |
| "applicationId": "app00001", |
| }, |
| }, |
| Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, |
| Status: v1.PodStatus{ |
| Phase: v1.PodPending, |
| }, |
| } |
| managedApp := amprotocol.GetApplication("app00001") |
| assert.Assert(t, managedApp != nil) |
| app, valid := managedApp.(*Application) |
| if !valid { |
| t.Fatal("application is expected to be of type Application") |
| } |
| assert.Equal(t, valid, true) |
| assert.Equal(t, app.GetApplicationID(), "app00001") |
| assert.Equal(t, app.GetApplicationState(), ApplicationStates().New) |
| assert.Equal(t, app.GetQueue(), "root.a") |
| assert.Equal(t, len(app.GetNewTasks()), 1) |
| |
| appID := "app00001" |
| UUID := "UID-POD-00002" |
| |
| context.applications[appID] = app |
| task1 := context.AddTask(&interfaces.AddTaskRequest{ |
| Metadata: interfaces.TaskMetadata{ |
| ApplicationID: "app00001", |
| TaskID: "task02", |
| Pod: pod, |
| Placeholder: true, |
| }, |
| }) |
| assert.Assert(t, task1 != nil) |
| assert.Equal(t, task1.GetTaskID(), "task02") |
| |
| _, taskErr := app.GetTask("task02") |
| assert.NilError(t, taskErr, "Task should exist") |
| |
| task2, task2Err := task1.(*Task) |
| if !task2Err { |
| // this should give an error |
| t.Error("task1 is expected to be of type Task") |
| } |
| task2.allocationUUID = UUID |
| |
| // app must be running states |
| err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, UUID)) |
| assert.Error(t, err, "event ReleaseAppAllocation inappropriate in current state New") |
| |
| // set app states to running, let event can be trigger |
| app.SetState(ApplicationStates().Running) |
| assertAppState(t, app, ApplicationStates().Running, 3*time.Second) |
| err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, UUID)) |
| assert.NilError(t, err) |
| // after handle release event the states of app must be running |
| assertAppState(t, app, ApplicationStates().Running, 3*time.Second) |
| |
| message := "Placeholder timed out" |
| reason := "placeholder has been timed out" |
| // check that the event has been published |
| err = utils.WaitForCondition(func() bool { |
| for { |
| select { |
| case event := <-recorder.Events: |
| if strings.Contains(event, reason) && strings.Contains(event, message) { |
| return true |
| } |
| default: |
| return false |
| } |
| } |
| }, 5*time.Millisecond, 20*time.Millisecond) |
| assert.NilError(t, err, "event should have been emitted") |
| } |