blob: 55c310b8baeca5fd2fe09819d06ad18e6fe4c5e1 [file] [log] [blame]
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package appmgmt
import (
v1 ""
apis ""
func TestAppManagerRecoveryState(t *testing.T) {
conf.GetSchedulerConf().OperatorPlugins = "mocked-app-manager"
amProtocol := cache.NewMockedAMProtocol()
apiProvider := client.NewMockedAPIProvider(false)
amService := NewAMService(amProtocol, apiProvider)
apps, err := amService.recoverApps()
assert.NilError(t, err)
assert.Equal(t, len(apps), 2)
for appId, app := range apps {
assert.Assert(t, appId == "app01" || appId == "app02")
assert.Equal(t, app.GetApplicationState(), cache.ApplicationStates().Recovering)
func TestAppManagerRecoveryTimeout(t *testing.T) {
conf.GetSchedulerConf().OperatorPlugins = "mocked-app-manager"
amProtocol := cache.NewMockedAMProtocol()
apiProvider := client.NewMockedAPIProvider(false)
amService := NewAMService(amProtocol, apiProvider)
apps, err := amService.recoverApps()
assert.NilError(t, err)
assert.Equal(t, len(apps), 2)
err = amService.waitForAppRecovery(apps, 3*time.Second)
assert.ErrorContains(t, err, "timeout waiting for app recovery")
func TestAppManagerRecoveryExitCondition(t *testing.T) {
conf.GetSchedulerConf().OperatorPlugins = "mocked-app-manager"
amProtocol := cache.NewMockedAMProtocol()
apiProvider := client.NewMockedAPIProvider(false)
amService := NewAMService(amProtocol, apiProvider)
apps, err := amService.recoverApps()
assert.NilError(t, err)
assert.Equal(t, len(apps), 2)
// simulate app recovery succeed
for _, app := range apps {
// this should not timeout
err = amService.waitForAppRecovery(apps, 3*time.Second)
assert.NilError(t, err)
// test app state transition during recovery
func TestAppStatesDuringRecovery(t *testing.T) {
conf.GetSchedulerConf().OperatorPlugins = "mocked-app-manager"
apiProvider := client.NewMockedAPIProvider(false)
ctx := cache.NewContext(apiProvider)
cb := callback.NewAsyncRMCallback(ctx)
dispatcher.RegisterEventHandler(dispatcher.EventTypeApp, ctx.ApplicationEventHandler())
defer dispatcher.Stop()
amService := NewAMService(ctx, apiProvider)
apps, err := amService.recoverApps()
assert.NilError(t, err)
assert.Equal(t, len(apps), 2)
// when the recovery starts, all apps should be under Recovering state
app01 := ctx.GetApplication("app01")
app02 := ctx.GetApplication("app02")
// waitForAppRecovery call should be blocked
// because the scheduler is still doing recovery
err = amService.waitForAppRecovery(apps, 3*time.Second)
assert.Error(t, err, "timeout waiting for app recovery in 3s")
assert.Equal(t, app01.GetApplicationState(), cache.ApplicationStates().Recovering)
assert.Equal(t, app02.GetApplicationState(), cache.ApplicationStates().Recovering)
// mock the responses, simulate app01 has been accepted
err = cb.UpdateApplication(&si.ApplicationResponse{
Accepted: []*si.AcceptedApplication{
ApplicationID: "app01",
assert.NilError(t, err, "failed to handle UpdateResponse")
// since app02 is still under recovery
// waitForRecovery should timeout because the scheduler is still under recovery
err = amService.waitForAppRecovery(apps, 3*time.Second)
assert.Error(t, err, "timeout waiting for app recovery in 3s")
assert.Equal(t, app01.GetApplicationState(), cache.ApplicationStates().Accepted)
assert.Equal(t, app02.GetApplicationState(), cache.ApplicationStates().Recovering)
// mock the responses, simulate app02 has been accepted
err = cb.UpdateApplication(&si.ApplicationResponse{
Accepted: []*si.AcceptedApplication{
ApplicationID: "app02",
assert.NilError(t, err, "failed to handle UpdateResponse")
// the app recovery has finished,
// this should not timeout anymore
err = amService.waitForAppRecovery(apps, 3*time.Second)
assert.NilError(t, err, "the app recovery is done, error is not expected")
assert.Equal(t, app01.GetApplicationState(), cache.ApplicationStates().Accepted)
assert.Equal(t, app02.GetApplicationState(), cache.ApplicationStates().Accepted)
func TestPodsSortedDuringRecovery(t *testing.T) {
conf.GetSchedulerConf().OperatorPlugins = "mocked-app-manager"
amProtocol := cache.NewMockedAMProtocol()
taskRequests := make([]*interfaces.AddTaskRequest, 0)
amProtocol.UseAddTaskFn(func(request *interfaces.AddTaskRequest) {
taskRequests = append(taskRequests, request)
apiProvider := client.NewMockedAPIProvider(false)
amService := NewAMService(amProtocol, apiProvider)
_, err := amService.recoverApps()
assert.NilError(t, err)
assert.Equal(t, 4, len(taskRequests))
var previous int64
previous = -1
for _, req := range taskRequests {
current := req.Metadata.Pod.CreationTimestamp.Unix()
assert.Assert(t, current > previous, "Pods were not processed in sorted order")
previous = current
type mockedAppManager struct {
func (ma *mockedAppManager) Name() string {
return "mocked-app-manager"
func (ma *mockedAppManager) ServiceInit() error {
return nil
func (ma *mockedAppManager) Start() error {
return nil
func (ma *mockedAppManager) Stop() {
// noop
func (ma *mockedAppManager) ListPods() ([]*v1.Pod, error) {
pods := make([]*v1.Pod, 4)
pods[0] = newPodHelper("pod1", "task01", "app01", time.Unix(100, 0))
pods[1] = newPodHelper("pod2", "task02", "app01", time.Unix(500, 0))
pods[2] = newPodHelper("pod3", "task03", "app02", time.Unix(200, 0))
pods[3] = newPodHelper("pod4", "task04", "app02", time.Unix(300, 0))
return pods, nil
func (ma *mockedAppManager) GetExistingAllocation(pod *v1.Pod) *si.Allocation {
return nil
func newPodHelper(name, podUID, appID string, creationTimeStamp time.Time) *v1.Pod {
return &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
ObjectMeta: apis.ObjectMeta{
Name: name,
Namespace: "yk",
UID: types.UID(podUID),
Annotations: map[string]string{
constants.AnnotationApplicationID: appID,
CreationTimestamp: apis.NewTime(creationTimeStamp),
Spec: v1.PodSpec{
NodeName: "fake-node",
SchedulerName: constants.SchedulerName,
Status: v1.PodStatus{
Phase: v1.PodRunning,