| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package objects |
| |
| import ( |
| "fmt" |
| "math" |
| "testing" |
| "time" |
| |
| "gotest.tools/v3/assert" |
| |
| "github.com/apache/yunikorn-core/pkg/common" |
| "github.com/apache/yunikorn-core/pkg/common/configs" |
| "github.com/apache/yunikorn-core/pkg/common/resources" |
| "github.com/apache/yunikorn-core/pkg/common/security" |
| "github.com/apache/yunikorn-core/pkg/events" |
| "github.com/apache/yunikorn-core/pkg/events/mock" |
| "github.com/apache/yunikorn-core/pkg/handler" |
| "github.com/apache/yunikorn-core/pkg/rmproxy" |
| "github.com/apache/yunikorn-core/pkg/rmproxy/rmevent" |
| "github.com/apache/yunikorn-core/pkg/scheduler/ugm" |
| siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" |
| "github.com/apache/yunikorn-scheduler-interface/lib/go/si" |
| ) |
| |
| var ( |
| nilNodeIterator = func() NodeIterator { |
| return nil |
| } |
| nilGetNode = func(string) *Node { |
| return nil |
| } |
| ) |
| |
| func setupUGM() { |
| userManager := ugm.GetUserManager() |
| userManager.ClearUserTrackers() |
| userManager.ClearGroupTrackers() |
| } |
| |
| // basic app creating with timeout checks |
| func TestNewApplication(t *testing.T) { |
| user := security.UserGroup{ |
| User: "testuser", |
| Groups: []string{}, |
| } |
| siApp := &si.AddApplicationRequest{} |
| app := NewApplication(siApp, user, nil, "") |
| assert.Equal(t, app.ApplicationID, "", "application ID should not be set was not set in SI") |
| assert.Equal(t, app.GetQueuePath(), "", "queue name should not be set was not set in SI") |
| assert.Equal(t, app.Partition, "", "partition name should not be set was not set in SI") |
| assert.Equal(t, app.rmID, "", "RM ID should not be set was not passed in") |
| assert.Equal(t, app.rmEventHandler, handler.EventHandler(nil), "event handler should be nil") |
| // just check one of the resources... |
| assert.Assert(t, resources.IsZero(app.placeholderAsk), "placeholder ask should be zero") |
| assert.Assert(t, app.IsNew(), "new application must be in new state") |
| // with the basics check the one thing that can really change |
| assert.Equal(t, app.execTimeout, defaultPlaceholderTimeout, "No timeout passed in should be default") |
| siApp.ExecutionTimeoutMilliSeconds = -1 |
| app = NewApplication(siApp, user, nil, "") |
| assert.Equal(t, app.execTimeout, defaultPlaceholderTimeout, "Negative timeout passed in should be default") |
| siApp.ExecutionTimeoutMilliSeconds = math.MaxInt64 |
| app = NewApplication(siApp, user, nil, "") |
| assert.Equal(t, app.execTimeout, defaultPlaceholderTimeout, "overly large timeout should be set to default") |
| siApp.ExecutionTimeoutMilliSeconds = 60000 |
| app = NewApplication(siApp, user, nil, "") |
| assert.Equal(t, app.execTimeout, 60*time.Second, "correct timeout should not set") |
| |
| originalPhTimeout := defaultPlaceholderTimeout |
| defaultPlaceholderTimeout = 100 * time.Microsecond |
| defer func() { defaultPlaceholderTimeout = originalPhTimeout }() |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) |
| siApp = &si.AddApplicationRequest{ |
| ApplicationID: "appID", |
| QueueName: "some.queue", |
| PartitionName: "AnotherPartition", |
| ExecutionTimeoutMilliSeconds: 0, |
| PlaceholderAsk: &si.Resource{Resources: map[string]*si.Quantity{"first": {Value: 1}}}, |
| } |
| app = NewApplication(siApp, user, rmproxy.NewMockedRMProxy(), "myRM") |
| assert.Equal(t, app.ApplicationID, "appID", "application ID should not be set to SI value") |
| assert.Equal(t, app.GetQueuePath(), "some.queue", "queue name should not be set to SI value") |
| assert.Equal(t, app.Partition, "AnotherPartition", "partition name should be set to SI value") |
| if app.rmEventHandler == nil { |
| t.Fatal("non nil handler was not set in the new app") |
| } |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| assert.Assert(t, app.IsNew(), "new application must be in new state") |
| assert.Equal(t, app.execTimeout, defaultPlaceholderTimeout, "no timeout passed in should be modified default") |
| assert.Assert(t, resources.Equals(app.placeholderAsk, res), "placeholder ask not set as expected") |
| |
| // valid tags |
| siApp = &si.AddApplicationRequest{} |
| siApp.Tags = map[string]string{ |
| siCommon.AppTagNamespaceResourceQuota: "{\"resources\":{\"validMaxRes\":{\"value\":11}}}", |
| siCommon.AppTagNamespaceResourceGuaranteed: "{\"resources\":{\"validGuaranteed\":{\"value\":22}}}", |
| } |
| app = NewApplication(siApp, user, nil, "") |
| guaranteed := app.GetGuaranteedResource() |
| maxResource := app.GetMaxResource() |
| assert.Assert(t, guaranteed != nil, "guaranteed resource has not been set") |
| assert.Equal(t, 1, len(guaranteed.Resources), "more than one resource has been set") |
| assert.Equal(t, resources.Quantity(22), guaranteed.Resources["validGuaranteed"]) |
| assert.Assert(t, maxResource != nil, "maximum resource has not been set") |
| assert.Equal(t, 1, len(maxResource.Resources), "more than one resource has been set") |
| assert.Equal(t, resources.Quantity(11), maxResource.Resources["validMaxRes"], "maximum resource is incorrect") |
| |
| // invalid tags |
| siApp = &si.AddApplicationRequest{} |
| siApp.Tags = map[string]string{ |
| siCommon.AppTagNamespaceResourceQuota: "{xxxxxx}", |
| siCommon.AppTagNamespaceResourceGuaranteed: "{yyyyy}", |
| } |
| app = NewApplication(siApp, user, nil, "") |
| guaranteed = app.GetGuaranteedResource() |
| maxResource = app.GetMaxResource() |
| assert.Assert(t, guaranteed == nil, "guaranteed resource should have not been set") |
| assert.Assert(t, maxResource == nil, "maximum resource should have not been set") |
| |
| // negative values |
| siApp = &si.AddApplicationRequest{} |
| siApp.Tags = map[string]string{ |
| siCommon.AppTagNamespaceResourceQuota: "{\"resources\":{\"negativeMax\":{\"value\":-11}}}", |
| siCommon.AppTagNamespaceResourceGuaranteed: "{\"resources\":{\"negativeGuaranteed\":{\"value\":-22}}}", |
| } |
| app = NewApplication(siApp, user, nil, "") |
| guaranteed = app.GetGuaranteedResource() |
| maxResource = app.GetMaxResource() |
| assert.Assert(t, guaranteed == nil, "guaranteed resource should have not been set") |
| assert.Assert(t, maxResource == nil, "maximum resource should have not been set") |
| } |
| |
| // test basic reservations |
| func TestAppReservation(t *testing.T) { |
| app := newApplication(appID1, "default", "root.unknown") |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| if app.HasReserved() { |
| t.Fatal("new app should not have reservations") |
| } |
| if app.IsReservedOnNode("") { |
| t.Error("app should not have reservations for empty node ID") |
| } |
| if app.IsReservedOnNode("unknown") { |
| t.Error("new app should not have reservations for unknown node") |
| } |
| |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| |
| // reserve illegal request |
| err = app.Reserve(nil, nil) |
| if err == nil { |
| t.Errorf("illegal reservation requested but did not fail") |
| } |
| |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15}) |
| ask := newAllocationAsk(aKey, appID1, res) |
| node := newNode(nodeID1, map[string]resources.Quantity{"first": 10}) |
| |
| // too large for node |
| err = app.Reserve(node, ask) |
| if err == nil { |
| t.Errorf("requested reservation does not fit in node resource but did not fail") |
| } |
| |
| res = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| ask = newAllocationAsk(aKey, appID1, res) |
| app = newApplication(appID1, "default", "root.unknown") |
| app.queue = queue |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been added to app") |
| // reserve that works |
| err = app.Reserve(node, ask) |
| assert.NilError(t, err, "reservation should not have failed") |
| if app.IsReservedOnNode("") { |
| t.Errorf("app should not have reservations for empty node ID") |
| } |
| if app.IsReservedOnNode("unknown") { |
| t.Error("app should not have reservations for unknown node") |
| } |
| if app.HasReserved() && !app.IsReservedOnNode(nodeID1) { |
| t.Errorf("app should have reservations for node %s", nodeID1) |
| } |
| |
| // node name similarity check: chop of the last char to make sure we check the full name |
| similar := nodeID1[:len(nodeID1)-1] |
| if app.HasReserved() && app.IsReservedOnNode(similar) { |
| t.Errorf("similar app should not have reservations for node %s", similar) |
| } |
| |
| // reserve the same reservation |
| err = app.Reserve(node, ask) |
| if err == nil { |
| t.Errorf("reservation should have failed") |
| } |
| |
| // unreserve unknown node/ask |
| _, err = app.UnReserve(nil, nil) |
| if err == nil { |
| t.Errorf("illegal reservation release but did not fail") |
| } |
| |
| // 2nd reservation for app |
| ask2 := newAllocationAsk("alloc-2", appID1, res) |
| node2 := newNode("node-2", map[string]resources.Quantity{"first": 10}) |
| err = app.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "ask2 should have been added to app") |
| err = app.Reserve(node, ask2) |
| if err == nil { |
| t.Errorf("reservation of node by second ask should have failed") |
| } |
| err = app.Reserve(node2, ask2) |
| assert.NilError(t, err, "reservation of 2nd node should not have failed") |
| _, err = app.UnReserve(node2, ask2) |
| assert.NilError(t, err, "remove of reservation of 2nd node should not have failed") |
| |
| // unreserve the same should fail |
| _, err = app.UnReserve(node2, ask2) |
| assert.NilError(t, err, "remove twice of reservation of 2nd node should have failed") |
| |
| // failure case: remove reservation from node, app still needs cleanup |
| var num int |
| num, err = node.unReserve(app, ask) |
| assert.NilError(t, err, "un-reserve on node should not have failed") |
| assert.Equal(t, num, 1, "un-reserve on node should have removed reservation") |
| num, err = app.UnReserve(node, ask) |
| assert.NilError(t, err, "app has reservation should not have failed") |
| assert.Equal(t, num, 1, "un-reserve on app should have removed reservation from app") |
| } |
| |
| // test multiple reservations from one allocation |
| func TestAppAllocReservation(t *testing.T) { |
| app := newApplication(appID1, "default", "root.unknown") |
| // Create event system after new application to avoid new application event. |
| events.Init() |
| eventSystem := events.GetEventSystem().(*events.EventSystemImpl) //nolint:errcheck |
| eventSystem.StartServiceWithPublisher(false) |
| app.disableStateChangeEvents() |
| app.resetAppEvents() |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| if app.HasReserved() { |
| t.Fatal("new app should not have reservations") |
| } |
| if len(app.GetAskReservations("")) != 0 { |
| t.Fatal("new app should not have reservation for empty allocKey") |
| } |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| |
| // reserve 1 allocate ask |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| ask := newAllocationAsk(aKey, appID1, res) |
| ask2 := newAllocationAsk(aKey2, appID1, res) |
| node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 10}) |
| // reserve that works |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been added to app") |
| err = app.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "ask2 should have been added to app") |
| err = app.Reserve(node1, ask) |
| assert.NilError(t, err, "reservation should not have failed") |
| if len(app.GetAskReservations("")) != 0 { |
| t.Fatal("app should not have reservation for empty allocKey") |
| } |
| nodeKey1 := nodeID1 + "|" + aKey |
| askReserved := app.GetAskReservations(aKey) |
| if len(askReserved) != 1 || askReserved[0] != nodeKey1 { |
| t.Errorf("app should have reservations for %s on %s and has not", aKey, nodeID1) |
| } |
| |
| nodeID2 := "node-2" |
| node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 10}) |
| err = app.Reserve(node2, ask2) |
| assert.NilError(t, err, "reservation should not have failed: error %v", err) |
| nodeKey2 := nodeID2 + "|" + aKey2 |
| askReserved = app.GetAskReservations(aKey2) |
| if len(askReserved) != 1 && askReserved[0] != nodeKey2 { |
| t.Errorf("app should have reservations for %s on %s and has not", aKey, nodeID2) |
| } |
| |
| // check duplicate reserve: nothing should change |
| if app.canAskReserve(ask) { |
| t.Error("ask has already reserved, reserve check should have failed") |
| } |
| node3 := newNode("node-3", map[string]resources.Quantity{"first": 10}) |
| err = app.Reserve(node3, ask) |
| if err == nil { |
| t.Errorf("reservation should have failed") |
| } |
| askReserved = app.GetAskReservations(aKey) |
| if len(askReserved) != 1 && askReserved[0] != nodeKey1 { |
| t.Errorf("app should have reservations for node %s and has not: %v", nodeID1, askReserved) |
| } |
| askReserved = app.GetAskReservations(aKey2) |
| if len(askReserved) != 1 && askReserved[0] != nodeKey2 { |
| t.Errorf("app should have reservations for node %s and has not: %v", nodeID2, askReserved) |
| } |
| // clean up all asks and reservations |
| reservedAsks := app.RemoveAllocationAsk("") |
| if app.HasReserved() || node1.IsReserved() || node2.IsReserved() || reservedAsks != 2 { |
| t.Errorf("ask removal did not clean up all reservations, reserved released = %d", reservedAsks) |
| } |
| } |
| |
| func TestAllocateDeallocate(t *testing.T) { |
| app := newApplication(appID1, "default", "root.unknown") |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| |
| // failure cases |
| var delta *resources.Resource |
| if delta, err = app.AllocateAsk(""); err == nil || delta != nil { |
| t.Error("empty ask key should not have been found by AllocateAsk()") |
| } |
| if delta, err = app.AllocateAsk("unknown"); err == nil || delta != nil { |
| t.Error("unknown ask key should not have been found by AllocateAsk()") |
| } |
| if delta, err = app.DeallocateAsk(""); err == nil || delta != nil { |
| t.Error("empty ask key should not have been found by DeallocateAsk()") |
| } |
| if delta, err = app.DeallocateAsk("unknown"); err == nil || delta != nil { |
| t.Error("unknown ask key should not have been found by DeallocateAsk()") |
| } |
| |
| // working cases |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| ask := newAllocationAsk(aKey, appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been added to app") |
| // allocate |
| if delta, err := app.AllocateAsk(aKey); err != nil || !resources.Equals(resources.Multiply(res, -1), delta) { |
| t.Errorf("AllocateAsk() did not return correct delta, err %v, expected %v got %v", err, resources.Multiply(res, -1), delta) |
| } |
| // allocate again should fail |
| if delta, err := app.AllocateAsk(aKey); err == nil || delta != nil { |
| t.Error("attempt to call Allocate() twice should have failed") |
| } |
| // deallocate |
| if delta, err := app.DeallocateAsk(aKey); err != nil || !resources.Equals(res, delta) { |
| t.Errorf("DeallocateAsk() did not return correct delta, err %v, expected %v got %v", err, res, delta) |
| } |
| // deallocate again should fail |
| if delta, err := app.DeallocateAsk(aKey); err == nil || delta != nil { |
| t.Error("attempt to call Deallocate() twice should have failed") |
| } |
| } |
| |
| // test pending calculation and ask addition |
| func TestAddAllocAsk(t *testing.T) { |
| app := newApplication(appID1, "default", "root.unknown") |
| // Create event system after new application to avoid new application event. |
| events.Init() |
| eventSystem := events.GetEventSystem().(*events.EventSystemImpl) //nolint:errcheck |
| eventSystem.StartServiceWithPublisher(false) |
| app.disableStateChangeEvents() |
| app.resetAppEvents() |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| |
| // failure cases |
| err = app.AddAllocationAsk(nil) |
| if err == nil { |
| t.Errorf("nil ask should not have been added to app") |
| } |
| res := resources.NewResource() |
| ask := newAllocationAsk(aKey, appID1, res) |
| err = app.AddAllocationAsk(ask) |
| if err == nil { |
| t.Errorf("zero resource ask should not have been added to app") |
| } |
| |
| // add alloc ask |
| res = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| ask = newAllocationAsk(aKey, appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been updated on app") |
| assert.Assert(t, app.IsAccepted(), "Application should be in accepted state") |
| pending := app.GetPendingResource() |
| if !resources.Equals(res, pending) { |
| t.Errorf("pending resource not updated correctly, expected %v but was: %v", res, pending) |
| } |
| |
| // test add alloc ask event |
| noEvents := uint64(0) |
| err = common.WaitFor(10*time.Millisecond, time.Second, func() bool { |
| fmt.Printf("checking event length: %d\n", eventSystem.Store.CountStoredEvents()) |
| noEvents = eventSystem.Store.CountStoredEvents() |
| return noEvents == 2 |
| }) |
| assert.NilError(t, err, "expected 2 events, got %d", noEvents) |
| records := eventSystem.Store.CollectEvents() |
| if records == nil { |
| t.Fatal("collecting eventChannel should return something") |
| } |
| assert.Equal(t, 2, len(records)) |
| record := records[1] |
| assert.Equal(t, si.EventRecord_APP, record.Type, "incorrect event type, expect app") |
| assert.Equal(t, appID1, record.ObjectID, "incorrect object ID, expected application ID") |
| assert.Equal(t, aKey, record.ReferenceID, "incorrect reference ID, expected placeholder alloc ID") |
| assert.Equal(t, si.EventRecord_ADD, record.EventChangeType, "incorrect change type, expected add") |
| assert.Equal(t, si.EventRecord_APP_REQUEST, record.EventChangeDetail, "incorrect change detail, expected app request") |
| eventSystem.Stop() |
| |
| // change resource |
| ask = newAllocationAsk(aKey, appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been updated on app") |
| pending = app.GetPendingResource() |
| if !resources.Equals(resources.Multiply(res, 2), app.GetPendingResource()) { |
| t.Errorf("pending resource not updated correctly, expected %v but was: %v", resources.Multiply(res, 2), pending) |
| } |
| |
| // after all this is must still be in an accepted state |
| assert.Assert(t, app.IsAccepted(), "Application should have stayed in accepted state") |
| |
| // test PlaceholderData |
| tg1 := "tg-1" |
| ask = newAllocationAskTG(aKey, appID1, tg1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been updated on app") |
| app.SetTimedOutPlaceholder(tg1, 1) |
| app.SetTimedOutPlaceholder("tg-2", 2) |
| clonePlaceholderData := app.GetAllPlaceholderData() |
| assert.Equal(t, len(clonePlaceholderData), 1) |
| assert.Equal(t, len(app.placeholderData), 1) |
| assert.Equal(t, clonePlaceholderData[0], app.placeholderData[tg1]) |
| assert.Equal(t, app.placeholderData[tg1].TaskGroupName, tg1) |
| assert.Equal(t, app.placeholderData[tg1].Count, int64(1)) |
| assert.Equal(t, app.placeholderData[tg1].Replaced, int64(0)) |
| assert.Equal(t, app.placeholderData[tg1].TimedOut, int64(1)) |
| assert.DeepEqual(t, app.placeholderData[tg1].MinResource, res) |
| |
| ask = newAllocationAskTG(aKey, appID1, tg1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been updated on app") |
| assert.Equal(t, len(app.placeholderData), 1) |
| assert.Equal(t, app.placeholderData[tg1].TaskGroupName, tg1) |
| assert.Equal(t, app.placeholderData[tg1].Count, int64(2)) |
| assert.Equal(t, app.placeholderData[tg1].Replaced, int64(0)) |
| assert.Equal(t, app.placeholderData[tg1].TimedOut, int64(1)) |
| assert.DeepEqual(t, app.placeholderData[tg1].MinResource, res) |
| |
| tg2 := "tg-2" |
| ask = newAllocationAskTG(aKey, appID1, tg2, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been updated on app") |
| assert.Equal(t, len(app.placeholderData), 2) |
| assert.Equal(t, app.placeholderData[tg2].TaskGroupName, tg2) |
| assert.Equal(t, app.placeholderData[tg2].Count, int64(1)) |
| assert.Equal(t, app.placeholderData[tg2].Replaced, int64(0)) |
| assert.Equal(t, app.placeholderData[tg2].TimedOut, int64(0)) |
| assert.DeepEqual(t, app.placeholderData[tg2].MinResource, res) |
| } |
| |
| // test state change on add and remove ask |
| func TestAllocAskStateChange(t *testing.T) { |
| app := newApplication(appID1, "default", "root.unknown") |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| ask := newAllocationAsk(aKey, appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been added to app") |
| assert.Assert(t, app.IsAccepted(), "Application should be in accepted state") |
| // make sure the state changes to waiting |
| assert.Equal(t, app.RemoveAllocationAsk(aKey), 0, "ask should have been removed, no reservations") |
| assert.Assert(t, app.IsCompleting(), "Application should be in waiting state") |
| |
| // make sure the state changes back correctly |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been added to app") |
| assert.Assert(t, app.IsRunning(), "Application should be in running state") |
| |
| // and back to waiting again, now from running |
| assert.Equal(t, app.RemoveAllocationAsk(aKey), 0, "ask should have been removed, no reservations") |
| assert.Assert(t, app.IsCompleting(), "Application should be in waiting state") |
| |
| log := app.GetStateLog() |
| assert.Equal(t, len(log), 4, "wrong number of app events") |
| assert.Equal(t, log[0].ApplicationState, Accepted.String()) |
| assert.Equal(t, log[1].ApplicationState, Completing.String()) |
| assert.Equal(t, log[2].ApplicationState, Running.String()) |
| assert.Equal(t, log[3].ApplicationState, Completing.String()) |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| } |
| |
| // test recover ask |
| func TestRecoverAllocAsk(t *testing.T) { |
| app := newApplication(appID1, "default", "root.unknown") |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| |
| // failure cases |
| app.RecoverAllocationAsk(nil) |
| assert.Equal(t, len(app.requests), 0, "nil ask should not be added") |
| |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| ask := newAllocationAsk(aKey, appID1, res) |
| app.RecoverAllocationAsk(ask) |
| assert.Equal(t, len(app.requests), 1, "ask should have been added") |
| assert.Assert(t, app.IsAccepted(), "Application should be in accepted state") |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| |
| ask = newAllocationAsk("ask-2", appID1, res) |
| app.RecoverAllocationAsk(ask) |
| assert.Equal(t, len(app.requests), 2, "ask should have been added, total should be 2") |
| assert.Assert(t, app.IsAccepted(), "Application should have stayed in accepted state") |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| |
| assert.Equal(t, 0, len(app.placeholderData)) |
| ask = newAllocationAskTG("ask-3", appID1, "testGroup", res) |
| app.RecoverAllocationAsk(ask) |
| phData := app.placeholderData |
| assert.Equal(t, 1, len(phData)) |
| taskGroupData := phData["testGroup"] |
| assert.Assert(t, taskGroupData != nil) |
| assert.Equal(t, "testGroup", taskGroupData.TaskGroupName) |
| assert.Equal(t, int64(1), taskGroupData.Count) |
| assert.Equal(t, int64(0), taskGroupData.Replaced) |
| assert.Equal(t, int64(0), taskGroupData.TimedOut) |
| assert.Assert(t, resources.Equals(taskGroupData.MinResource, res)) |
| } |
| |
| // test reservations removal by allocation |
| func TestRemoveReservedAllocAsk(t *testing.T) { |
| app := newApplication(appID1, "default", "root.unknown") |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| |
| // create app and allocs |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| ask1 := newAllocationAsk(aKey, appID1, res) |
| err = app.AddAllocationAsk(ask1) |
| assert.NilError(t, err, "resource ask1 should have been added to app") |
| |
| allocKey := "alloc-2" |
| ask2 := newAllocationAsk(allocKey, appID1, res) |
| err = app.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "resource ask2 should have been added to app") |
| |
| // reserve one alloc and remove |
| node := newNode(nodeID1, map[string]resources.Quantity{"first": 10}) |
| err = app.Reserve(node, ask2) |
| assert.NilError(t, err, "reservation should not have failed") |
| if len(app.GetAskReservations(allocKey)) != 1 || !node.IsReserved() { |
| t.Fatalf("app should have reservation for %v on node", allocKey) |
| } |
| before := app.GetPendingResource().Clone() |
| reservedAsks := app.RemoveAllocationAsk(allocKey) |
| delta := resources.Sub(before, app.GetPendingResource()) |
| if !resources.Equals(res, delta) || reservedAsks != 1 { |
| t.Errorf("resource ask2 should have been removed from app: %v, (reserved released = %d)", delta, reservedAsks) |
| } |
| if app.HasReserved() || node.IsReserved() { |
| t.Fatal("app and node should not have reservations") |
| } |
| |
| // reserve again: then remove from node before remove from app |
| err = app.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "resource ask2 should have been added to app") |
| |
| err = app.Reserve(node, ask2) |
| assert.NilError(t, err, "reservation should not have failed: error %v", err) |
| if len(app.GetAskReservations(allocKey)) != 1 || !node.IsReserved() { |
| t.Fatalf("app should have reservation for %v on node", allocKey) |
| } |
| var num int |
| num, err = node.unReserve(app, ask2) |
| assert.NilError(t, err, "un-reserve on node should not have failed") |
| assert.Equal(t, num, 1, "un-reserve on node should have removed reservation") |
| |
| before = app.GetPendingResource().Clone() |
| reservedAsks = app.RemoveAllocationAsk(allocKey) |
| delta = resources.Sub(before, app.GetPendingResource()) |
| if !resources.Equals(res, delta) || reservedAsks != 1 { |
| t.Errorf("resource ask2 should have been removed from app: %v, (reserved released = %d)", delta, reservedAsks) |
| } |
| // app reservation is removed even though the node removal failed |
| if app.HasReserved() || node.IsReserved() { |
| t.Fatal("app and node should not have reservations") |
| } |
| // add a new reservation: use the existing ask1 |
| err = app.Reserve(node, ask1) |
| assert.NilError(t, err, "reservation should not have failed: error %v", err) |
| // clean up |
| reservedAsks = app.RemoveAllocationAsk("") |
| if !resources.IsZero(app.GetPendingResource()) || reservedAsks != 1 { |
| t.Errorf("all resource asks should have been removed from app: %v, (reserved released = %d)", app.GetPendingResource(), reservedAsks) |
| } |
| // app reservation is removed due to ask removal |
| if app.HasReserved() || node.IsReserved() { |
| t.Fatal("app and node should not have reservations") |
| } |
| } |
| |
| // test pending calculation and ask removal |
| func TestRemoveAllocAsk(t *testing.T) { |
| app := newApplication(appID1, "default", "root.unknown") |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| |
| // failures cases: things should not crash (nothing happens) |
| reservedAsks := app.RemoveAllocationAsk("") |
| if !resources.IsZero(app.GetPendingResource()) || reservedAsks != 0 { |
| t.Errorf("pending resource not updated correctly removing all, expected zero but was: %v", app.GetPendingResource()) |
| } |
| reservedAsks = app.RemoveAllocationAsk("unknown") |
| if !resources.IsZero(app.GetPendingResource()) || reservedAsks != 0 { |
| t.Errorf("pending resource not updated correctly removing unknown, expected zero but was: %v", app.GetPendingResource()) |
| } |
| |
| // setup the allocs |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| ask := newAllocationAsk(aKey, appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask 1 should have been added to app") |
| ask = newAllocationAsk(aKey2, appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask 2 should have been added to app") |
| if len(app.requests) != 2 { |
| t.Fatalf("missing asks from app expected 2 got %d", len(app.requests)) |
| } |
| expected := resources.Multiply(res, 2) |
| if !resources.Equals(expected, app.GetPendingResource()) { |
| t.Errorf("pending resource not updated correctly, expected %v but was: %v", expected, app.GetPendingResource()) |
| } |
| |
| // test removes unknown (nothing happens) |
| reservedAsks = app.RemoveAllocationAsk("unknown") |
| if reservedAsks != 0 { |
| t.Errorf("asks released which did not exist: %d", reservedAsks) |
| } |
| delta := app.GetPendingResource().Clone() |
| reservedAsks = app.RemoveAllocationAsk(aKey) |
| delta.SubFrom(app.GetPendingResource()) |
| if !resources.Equals(delta, res) || reservedAsks != 0 { |
| t.Errorf("ask should have been removed from app, err %v, expected delta %v but was: %v, (reserved released = %d)", err, expected, delta, reservedAsks) |
| } |
| reservedAsks = app.RemoveAllocationAsk("") |
| if len(app.requests) != 0 || reservedAsks != 0 { |
| t.Fatalf("asks not removed as expected 0 got %d, (reserved released = %d)", len(app.requests), reservedAsks) |
| } |
| if !resources.IsZero(app.GetPendingResource()) { |
| t.Errorf("pending resource not updated correctly, expected zero but was: %v", app.GetPendingResource()) |
| } |
| } |
| |
| // test pending calculation and ask removal |
| func TestRemoveAllocAskWithPlaceholders(t *testing.T) { |
| app := newApplication(appID1, "default", "root.unknown") |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| ask := newAllocationAsk(aKey, appID1, res) |
| ask.placeholder = true |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask 1 should have been added to app") |
| |
| ask = newAllocationAsk("alloc-2", appID1, res) |
| ask.placeholder = true |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask 2 should have been added to app") |
| |
| reservedAsks := app.RemoveAllocationAsk("alloc-1") |
| assert.Equal(t, 0, reservedAsks) |
| assert.Equal(t, Accepted.String(), app.stateMachine.Current()) |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| reservedAsks = app.RemoveAllocationAsk("alloc-2") |
| assert.Equal(t, 0, reservedAsks) |
| assert.Equal(t, Completing.String(), app.stateMachine.Current()) |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| } |
| |
| func TestRemovePlaceholderAllocationWithNoRealAllocation(t *testing.T) { |
| app := newApplication(appID1, "default", "root.unknown") |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| ask := newAllocationAsk(aKey, appID1, res) |
| ask.placeholder = true |
| allocInfo := NewAllocation(nodeID1, ask) |
| app.AddAllocation(allocInfo) |
| err := app.handleApplicationEventWithLocking(RunApplication) |
| assert.NilError(t, err, "no error expected new to accepted") |
| |
| app.RemoveAllocation("alloc-1", si.TerminationType_UNKNOWN_TERMINATION_TYPE) |
| assert.Equal(t, app.stateMachine.Current(), Completing.String()) |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| } |
| |
| func TestStateChangeOnUpdate(t *testing.T) { |
| // create a fake queue |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| |
| app := newApplication(appID1, "default", "root.unknown") |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| // fake the queue assignment |
| app.queue = queue |
| // app should be new |
| assert.Assert(t, app.IsNew(), "New application did not return new state: %s", app.CurrentState()) |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) |
| askID := "ask-1" |
| ask := newAllocationAsk(askID, appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been added to app") |
| // app with ask should be accepted |
| assert.Assert(t, app.IsAccepted(), "application did not change to accepted state: %s", app.CurrentState()) |
| |
| // removing the ask should move it to waiting |
| released := app.RemoveAllocationAsk(askID) |
| assert.Equal(t, released, 0, "allocation ask should not have been reserved") |
| assert.Assert(t, app.IsCompleting(), "application did not change to waiting state: %s", app.CurrentState()) |
| |
| // start with a fresh state machine |
| app = newApplication(appID1, "default", "root.unknown") |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| // fake the queue assignment |
| app.queue = queue |
| // app should be new |
| assert.Assert(t, app.IsNew(), "New application did not return new state: %s", app.CurrentState()) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been added to app") |
| // app with ask should be accepted |
| assert.Assert(t, app.IsAccepted(), "Application did not change to accepted state: %s", app.CurrentState()) |
| // add an alloc |
| allocInfo := NewAllocation(nodeID1, ask) |
| app.AddAllocation(allocInfo) |
| // app should be running |
| assert.Assert(t, app.IsRunning(), "Application did not return running state after alloc: %s", app.CurrentState()) |
| assertUserGroupResource(t, getTestUserGroup(), res) |
| |
| // removing the ask should not move anywhere as there is an allocation |
| released = app.RemoveAllocationAsk(askID) |
| assert.Equal(t, released, 0, "allocation ask should not have been reserved") |
| assert.Assert(t, app.IsRunning(), "Application should have stayed same, changed unexpectedly: %s", app.CurrentState()) |
| |
| // remove the allocation, ask has been removed so nothing left |
| app.RemoveAllocation(askID, si.TerminationType_UNKNOWN_TERMINATION_TYPE) |
| assert.Assert(t, app.IsCompleting(), "Application did not change as expected: %s", app.CurrentState()) |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| |
| log := app.GetStateLog() |
| assert.Equal(t, len(log), 3, "wrong number of app events") |
| assert.Equal(t, log[0].ApplicationState, Accepted.String()) |
| assert.Equal(t, log[1].ApplicationState, Running.String()) |
| assert.Equal(t, log[2].ApplicationState, Completing.String()) |
| } |
| |
| func TestStateChangeOnPlaceholderAdd(t *testing.T) { |
| // create a fake queue |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| |
| app := newApplication(appID1, "default", "root.unknown") |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| // fake the queue assignment |
| app.queue = queue |
| // app should be new |
| assert.Assert(t, app.IsNew(), "New application did not return new state: %s", app.CurrentState()) |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) |
| askID := "ask-1" |
| ask := newAllocationAskTG(askID, appID1, "TG1", res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been added to app") |
| // app with ask, even for placeholder, should be accepted |
| assert.Assert(t, app.IsAccepted(), "application did not change to accepted state: %s", app.CurrentState()) |
| |
| // removing the ask should move it to waiting |
| released := app.RemoveAllocationAsk(askID) |
| assert.Equal(t, released, 0, "allocation ask should not have been reserved") |
| assert.Assert(t, app.IsCompleting(), "application did not change to waiting state: %s", app.CurrentState()) |
| |
| // start with a fresh state machine |
| app = newApplication(appID1, "default", "root.unknown") |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| // fake the queue assignment |
| app.queue = queue |
| // app should be new |
| assert.Assert(t, app.IsNew(), "New application did not return new state: %s", app.CurrentState()) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been added to app") |
| // app with ask should be accepted |
| assert.Assert(t, app.IsAccepted(), "Application did not change to accepted state: %s", app.CurrentState()) |
| // add an alloc based on the placeholder ask |
| allocInfo := NewAllocation(nodeID1, ask) |
| app.AddAllocation(allocInfo) |
| // app should be in the same state as it was before as it is a placeholder allocation |
| assert.Assert(t, app.IsAccepted(), "Application did not return accepted state after alloc: %s", app.CurrentState()) |
| assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), res), "placeholder allocation not set as expected") |
| assert.Assert(t, resources.IsZero(app.GetAllocatedResource()), "allocated resource should have been zero") |
| assertUserGroupResource(t, getTestUserGroup(), res) |
| |
| // first we have to remove the allocation itself |
| alloc := app.RemoveAllocation(askID, si.TerminationType_UNKNOWN_TERMINATION_TYPE) |
| assert.Assert(t, alloc != nil, "Nil allocation was returned") |
| assert.Assert(t, app.IsAccepted(), "Application should have stayed in Accepted, changed unexpectedly: %s", app.CurrentState()) |
| // removing the ask should move the application into the waiting state, because the allocation is only a placeholder allocation |
| released = app.RemoveAllocationAsk(askID) |
| assert.Equal(t, released, 0, "allocation ask should not have been reserved") |
| assert.Assert(t, app.IsCompleting(), "Application should have stayed same, changed unexpectedly: %s", app.CurrentState()) |
| assertUserGroupResource(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0})) |
| |
| log := app.GetStateLog() |
| assert.Equal(t, len(log), 2, "wrong number of app events") |
| assert.Equal(t, log[0].ApplicationState, Accepted.String()) |
| assert.Equal(t, log[1].ApplicationState, Completing.String()) |
| } |
| |
| func TestAllocations(t *testing.T) { |
| setupUGM() |
| app := newApplication(appID1, "default", "root.a") |
| |
| // nothing allocated |
| if !resources.IsZero(app.GetAllocatedResource()) { |
| t.Error("new application has allocated resources") |
| } |
| // create an allocation and check the assignment |
| resMap := map[string]string{"memory": "100", "vcores": "10"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "failed to create resource with error") |
| alloc := newAllocation(appID1, nodeID1, res) |
| app.AddAllocation(alloc) |
| if !resources.Equals(app.allocatedResource, res) { |
| t.Errorf("allocated resources is not updated correctly: %v", app.allocatedResource) |
| } |
| allocs := app.GetAllAllocations() |
| assert.Equal(t, len(allocs), 1) |
| assert.Assert(t, app.getPlaceholderTimer() == nil, "Placeholder timer should not be initialized as the allocation is not a placeholder") |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) |
| |
| // add more allocations to test the removals |
| alloc = newAllocation(appID1, nodeID1, res) |
| app.AddAllocation(alloc) |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| alloc = newAllocation(appID1, nodeID1, res) |
| app.AddAllocation(alloc) |
| allocs = app.GetAllAllocations() |
| assert.Equal(t, len(allocs), 3) |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 3)) |
| // remove one of the 3 |
| if alloc = app.RemoveAllocation(alloc.GetAllocationKey(), si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc == nil { |
| t.Error("returned allocations was nil allocation was not removed") |
| } |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| // try to remove a non existing alloc |
| if alloc = app.RemoveAllocation("does-not-exist", si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc != nil { |
| t.Errorf("returned allocations was not allocation was incorrectly removed: %v", alloc) |
| } |
| // remove all left over allocations |
| if allocs = app.RemoveAllAllocations(); allocs == nil || len(allocs) != 2 { |
| t.Errorf("returned number of allocations was incorrect: %v", allocs) |
| } |
| allocs = app.GetAllAllocations() |
| assert.Equal(t, len(allocs), 0) |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| } |
| |
| func TestGangAllocChange(t *testing.T) { |
| resMap := map[string]string{"first": "4"} |
| totalPH, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "failed to create resource with error") |
| |
| app := newApplication(appID1, "default", "root.a") |
| app.placeholderAsk = totalPH |
| assert.Assert(t, app.IsNew(), "newly created app should be in new state") |
| assert.Assert(t, resources.IsZero(app.GetAllocatedResource()), "new application has allocated resources") |
| assert.Assert(t, resources.IsZero(app.GetPlaceholderResource()), "new application has placeholder allocated resources") |
| assert.Assert(t, resources.Equals(app.GetPlaceholderAsk(), totalPH), "placeholder ask resource not set as expected") |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| |
| // move the app to the accepted state as if we added an ask |
| app.SetState(Accepted.String()) |
| // create an allocation and check the assignment |
| resMap = map[string]string{"first": "2"} |
| var res *resources.Resource |
| res, err = resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "failed to create resource with error") |
| alloc := newAllocation(appID1, nodeID1, res) |
| alloc.placeholder = true |
| app.AddAllocation(alloc) |
| assert.Assert(t, resources.Equals(app.allocatedPlaceholder, res), "allocated placeholders resources is not updated correctly: %s", app.allocatedPlaceholder.String()) |
| assert.Equal(t, len(app.GetAllAllocations()), 1) |
| assert.Assert(t, app.IsAccepted(), "app should still be in accepted state") |
| assertUserGroupResource(t, getTestUserGroup(), res) |
| |
| // add second placeholder this should trigger state update |
| alloc = newAllocation(appID1, nodeID1, res) |
| alloc.placeholder = true |
| app.AddAllocation(alloc) |
| assert.Assert(t, resources.Equals(app.allocatedPlaceholder, totalPH), "allocated placeholders resources is not updated correctly: %s", app.allocatedPlaceholder.String()) |
| assert.Equal(t, len(app.GetAllAllocations()), 2) |
| assert.Assert(t, app.IsRunning(), "app should have changed to running state") |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| |
| // add a real alloc this should NOT trigger state update |
| alloc = newAllocation(appID1, nodeID1, res) |
| alloc.SetResult(Replaced) |
| app.AddAllocation(alloc) |
| assert.Equal(t, len(app.GetAllAllocations()), 3) |
| assert.Assert(t, app.IsRunning(), "app should still be in running state") |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 3)) |
| |
| // add a second real alloc this should NOT trigger state update |
| alloc = newAllocation(appID1, nodeID1, res) |
| alloc.SetResult(Replaced) |
| app.AddAllocation(alloc) |
| assert.Equal(t, len(app.GetAllAllocations()), 4) |
| assert.Assert(t, app.IsRunning(), "app should still be in running state") |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 4)) |
| } |
| |
| func TestAllocChange(t *testing.T) { |
| setupUGM() |
| app := newApplication(appID1, "default", "root.a") |
| assert.Assert(t, app.IsNew(), "newly created app should be in new state") |
| assert.Assert(t, resources.IsZero(app.GetAllocatedResource()), "new application has allocated resources") |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| |
| // move the app to the accepted state as if we added an ask |
| app.SetState(Accepted.String()) |
| // create an allocation and check the assignment |
| resMap := map[string]string{"first": "2"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "failed to create resource with error") |
| alloc := newAllocation(appID1, nodeID1, res) |
| // adding a normal allocation should change the state |
| app.AddAllocation(alloc) |
| assert.Assert(t, resources.Equals(app.allocatedResource, res), "allocated resources is not updated correctly: %s", app.allocatedResource.String()) |
| assert.Equal(t, len(app.GetAllAllocations()), 1) |
| assert.Assert(t, app.IsRunning(), "app should be in running state") |
| assertUserGroupResource(t, getTestUserGroup(), res) |
| |
| // add a second real alloc this should trigger state update |
| alloc = newAllocation(appID1, nodeID1, res) |
| app.AddAllocation(alloc) |
| assert.Equal(t, len(app.GetAllAllocations()), 2) |
| assert.Assert(t, app.IsRunning(), "app should have changed to running` state") |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| } |
| |
| func TestQueueUpdate(t *testing.T) { |
| app := newApplication(appID1, "default", "root.a") |
| |
| root, err := createRootQueue(nil) |
| assert.NilError(t, err, "failed to create root queue") |
| queue, err := createDynamicQueue(root, "test", false) |
| assert.NilError(t, err, "failed to create test queue") |
| app.SetQueue(queue) |
| assert.Equal(t, app.GetQueuePath(), "root.test") |
| } |
| |
| func TestCompleted(t *testing.T) { |
| completingTimeout = time.Millisecond * 100 |
| terminatedTimeout = time.Millisecond * 100 |
| defer func() { |
| completingTimeout = time.Second * 30 |
| terminatedTimeout = 3 * 24 * time.Hour |
| }() |
| app := newApplication(appID1, "default", "root.a") |
| app.requests = map[string]*AllocationAsk{ |
| "test": {}, |
| } |
| app.sortedRequests = append(app.sortedRequests, &AllocationAsk{}) |
| err := app.handleApplicationEventWithLocking(RunApplication) |
| assert.NilError(t, err, "no error expected new to accepted (completed test)") |
| err = app.handleApplicationEventWithLocking(CompleteApplication) |
| assert.NilError(t, err, "no error expected accepted to completing (completed test)") |
| assert.Assert(t, app.IsCompleting(), "App should be waiting") |
| // give it some time to run and progress |
| err = common.WaitFor(10*time.Microsecond, time.Millisecond*200, app.IsCompleted) |
| assert.NilError(t, err, "Application did not progress into Completed state") |
| |
| err = common.WaitFor(1*time.Millisecond, time.Millisecond*200, app.IsExpired) |
| assert.NilError(t, err, "Application did not progress into Expired state") |
| |
| assert.Assert(t, app.sortedRequests == nil) |
| assert.Equal(t, 0, len(app.requests)) |
| log := app.GetStateLog() |
| assert.Equal(t, len(log), 4, "wrong number of app events") |
| assert.Equal(t, log[0].ApplicationState, Accepted.String()) |
| assert.Equal(t, log[1].ApplicationState, Completing.String()) |
| assert.Equal(t, log[2].ApplicationState, Completed.String()) |
| assert.Equal(t, log[3].ApplicationState, Expired.String()) |
| } |
| |
| func assertResourceUsage(t *testing.T, appSummary *ApplicationSummary, memorySeconds int64, vcoresSecconds int64) { |
| detailedResource := appSummary.ResourceUsage.TrackedResourceMap[instType1] |
| assert.Equal(t, memorySeconds, detailedResource["memory"]) |
| assert.Equal(t, vcoresSecconds, detailedResource["vcores"]) |
| } |
| |
| func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary, memorySeconds int64, |
| vcoresSecconds int64) { |
| detailedResource := appSummary.PlaceholderResource.TrackedResourceMap[instType1] |
| assert.Equal(t, memorySeconds, detailedResource["memory"]) |
| assert.Equal(t, vcoresSecconds, detailedResource["vcores"]) |
| } |
| |
| func TestResourceUsageAggregation(t *testing.T) { |
| setupUGM() |
| |
| app := newApplication(appID1, "default", "root.a") |
| |
| // nothing allocated |
| if !resources.IsZero(app.GetAllocatedResource()) { |
| t.Error("new application has allocated resources") |
| } |
| // create an allocation and check the assignment |
| resMap := map[string]string{"memory": "100", "vcores": "10"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "failed to create resource with error") |
| alloc := newAllocation(appID1, nodeID1, res) |
| alloc.SetInstanceType(instType1) |
| // Mock the time to be 3 seconds before |
| alloc.SetBindTime(time.Now().Add(-3 * time.Second)) |
| app.AddAllocation(alloc) |
| |
| if !resources.Equals(app.allocatedResource, res) { |
| t.Errorf("allocated resources is not updated correctly: %v", app.allocatedResource) |
| } |
| allocs := app.GetAllAllocations() |
| assert.Equal(t, len(allocs), 1) |
| assert.Assert(t, app.getPlaceholderTimer() == nil, "Placeholder timer should not be initialized as the allocation is not a placeholder") |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) |
| |
| err = app.HandleApplicationEvent(RunApplication) |
| assert.NilError(t, err, "no error expected new to accepted (completed test)") |
| |
| appSummary := app.GetApplicationSummary("default") |
| appSummary.DoLogging() |
| assertResourceUsage(t, appSummary, 0, 0) |
| |
| // add more allocations to test the removals |
| alloc = newAllocation(appID1, nodeID1, res) |
| alloc.SetInstanceType(instType1) |
| |
| // Mock the time to be 3 seconds before |
| alloc.SetBindTime(time.Now().Add(-3 * time.Second)) |
| app.AddAllocation(alloc) |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| |
| // remove one of the 2 |
| if alloc = app.RemoveAllocation(alloc.GetAllocationKey(), si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc == nil { |
| t.Error("returned allocations was nil allocation was not removed") |
| } |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) |
| |
| appSummary = app.GetApplicationSummary("default") |
| appSummary.DoLogging() |
| assertResourceUsage(t, appSummary, 300, 30) |
| |
| alloc = newAllocation(appID1, nodeID1, res) |
| alloc.SetInstanceType(instType1) |
| app.AddAllocation(alloc) |
| allocs = app.GetAllAllocations() |
| assert.Equal(t, len(allocs), 2) |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| |
| appSummary = app.GetApplicationSummary("default") |
| appSummary.DoLogging() |
| assertResourceUsage(t, appSummary, 300, 30) |
| |
| // try to remove a non existing alloc |
| if alloc = app.RemoveAllocation("does-not-exist", si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc != nil { |
| t.Errorf("returned allocations was not allocation was incorrectly removed: %v", alloc) |
| } |
| |
| // remove all left over allocations |
| if allocs = app.RemoveAllAllocations(); allocs == nil || len(allocs) != 2 { |
| t.Errorf("returned number of allocations was incorrect: %v", allocs) |
| } |
| allocs = app.GetAllAllocations() |
| assert.Equal(t, len(allocs), 0) |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| |
| err = app.HandleApplicationEvent(CompleteApplication) |
| assert.NilError(t, err, "no error expected accepted to completing (completed test)") |
| |
| appSummary = app.GetApplicationSummary("default") |
| appSummary.DoLogging() |
| assertResourceUsage(t, appSummary, 600, 60) |
| } |
| |
| func TestRejected(t *testing.T) { |
| terminatedTimeout = time.Millisecond * 100 |
| defer func() { |
| terminatedTimeout = 3 * 24 * time.Hour |
| }() |
| app := newApplication(appID1, "default", "root.a") |
| rejectedMessage := fmt.Sprintf("Failed to place application %s: application rejected: no placement rule matched", app.ApplicationID) |
| err := app.handleApplicationEventWithInfoLocking(RejectApplication, rejectedMessage) |
| assert.NilError(t, err, "no error expected new to rejected") |
| |
| err = common.WaitFor(1*time.Millisecond, time.Millisecond*200, app.IsRejected) |
| assert.NilError(t, err, "Application did not progress into Rejected state") |
| assert.Assert(t, !app.FinishedTime().IsZero()) |
| assert.Equal(t, app.rejectedMessage, rejectedMessage) |
| assert.Equal(t, app.GetRejectedMessage(), rejectedMessage) |
| |
| err = common.WaitFor(1*time.Millisecond, time.Millisecond*200, app.IsExpired) |
| assert.NilError(t, err, "Application did not progress into Expired state") |
| |
| log := app.GetStateLog() |
| assert.Equal(t, len(log), 2, "wrong number of app events") |
| assert.Equal(t, log[0].ApplicationState, Rejected.String()) |
| assert.Equal(t, log[1].ApplicationState, Expired.String()) |
| } |
| |
| func TestGetTag(t *testing.T) { |
| app := newApplicationWithTags(appID1, "default", "root.a", nil) |
| tag := app.GetTag("") |
| assert.Equal(t, tag, "", "expected empty tag value if tags nil") |
| tags := make(map[string]string) |
| app = newApplicationWithTags(appID1, "default", "root.a", tags) |
| tag = app.GetTag("") |
| assert.Equal(t, tag, "", "expected empty tag value if no tags defined") |
| tags["test"] = "test value" |
| app = newApplicationWithTags(appID1, "default", "root.a", tags) |
| tag = app.GetTag("notfound") |
| assert.Equal(t, tag, "", "expected empty tag value if tag not found") |
| tag = app.GetTag("test") |
| assert.Equal(t, tag, "test value", "expected tag value") |
| } |
| |
| func TestIsCreateForced(t *testing.T) { |
| app := newApplicationWithTags(appID1, "default", "root.a", nil) |
| assert.Check(t, !app.IsCreateForced(), "found forced app but tags nil") |
| tags := make(map[string]string) |
| app = newApplicationWithTags(appID1, "default", "root.a", tags) |
| assert.Check(t, !app.IsCreateForced(), "found forced app but tags empty") |
| tags[siCommon.AppTagCreateForce] = "false" |
| app = newApplicationWithTags(appID1, "default", "root.a", tags) |
| assert.Check(t, !app.IsCreateForced(), "found forced app but forced tag was false") |
| tags[siCommon.AppTagCreateForce] = "unknown" |
| app = newApplicationWithTags(appID1, "default", "root.a", tags) |
| assert.Check(t, !app.IsCreateForced(), "found forced app but forced tag was invalid") |
| tags[siCommon.AppTagCreateForce] = "true" |
| app = newApplicationWithTags(appID1, "default", "root.a", tags) |
| assert.Check(t, app.IsCreateForced(), "found unforced app but forced tag was set") |
| } |
| |
| func TestOnStatusChangeCalled(t *testing.T) { |
| app, testHandler := newApplicationWithHandler(appID1, "default", "root.a") |
| assert.Equal(t, New.String(), app.CurrentState(), "new app not in New state") |
| |
| err := app.handleApplicationEventWithLocking(RunApplication) |
| assert.NilError(t, err, "error returned which was not expected") |
| assert.Assert(t, testHandler.IsHandled(), "handler did not get called as expected") |
| |
| // accepted to rejected: error expected |
| err = app.handleApplicationEventWithLocking(RejectApplication) |
| assert.Assert(t, err != nil, "error expected and not seen") |
| assert.Equal(t, app.CurrentState(), Accepted.String(), "application state has been changed unexpectedly") |
| assert.Assert(t, !testHandler.IsHandled(), "unexpected event send to the RM") |
| |
| log := app.GetStateLog() |
| assert.Equal(t, len(log), 1, "wrong number of app events") |
| assert.Equal(t, log[0].ApplicationState, Accepted.String()) |
| } |
| |
| func TestReplaceAllocation(t *testing.T) { |
| setupUGM() |
| |
| app := newApplication(appID1, "default", "root.a") |
| assert.Equal(t, New.String(), app.CurrentState(), "new app not in New state") |
| // state changes are not important |
| app.SetState(Running.String()) |
| |
| // non existing allocation |
| var nilAlloc *Allocation |
| alloc := app.ReplaceAllocation("") |
| assert.Equal(t, alloc, nilAlloc, "expected nil to be returned got a real alloc: %s", alloc) |
| // create an allocation and check the assignment |
| resMap := map[string]string{"memory": "100", "vcores": "10"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "failed to create resource with error") |
| ph := newPlaceholderAlloc(appID1, nodeID1, res) |
| // add the placeholder to the app |
| app.AddAllocation(ph) |
| // add PlaceholderData |
| app.addPlaceholderDataWithLocking(ph.GetAsk()) |
| assert.Equal(t, len(app.placeholderData), 1) |
| assert.Equal(t, app.placeholderData[""].TaskGroupName, "") |
| assert.Equal(t, app.placeholderData[""].Count, int64(1)) |
| assert.Equal(t, app.placeholderData[""].Replaced, int64(0)) |
| assert.Equal(t, app.placeholderData[""].TimedOut, int64(0)) |
| assert.DeepEqual(t, app.placeholderData[""].MinResource, res) |
| |
| assert.Equal(t, len(app.allocations), 1, "allocation not added as expected") |
| assert.Assert(t, resources.IsZero(app.allocatedResource), "placeholder counted as real allocation") |
| if !resources.Equals(app.allocatedPlaceholder, res) { |
| t.Fatalf("placeholder allocation not updated as expected: got %s, expected %s", app.allocatedPlaceholder, res) |
| } |
| assertUserGroupResource(t, getTestUserGroup(), res) |
| alloc = app.ReplaceAllocation(ph.GetAllocationKey()) |
| assert.Equal(t, alloc, nilAlloc, "placeholder without releases expected nil to be returned got a real alloc: %s", alloc) |
| assert.Equal(t, app.placeholderData[""].Replaced, int64(0)) |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| // add the placeholder back to the app, the failure test above changed state and removed the ph |
| app.SetState(Running.String()) |
| app.AddAllocation(ph) |
| app.addPlaceholderDataWithLocking(ph.GetAsk()) |
| assert.Equal(t, app.placeholderData[""].Count, int64(2)) |
| assertUserGroupResource(t, getTestUserGroup(), res) |
| |
| // set the real one to replace the placeholder |
| realAlloc := newAllocation(appID1, nodeID1, res) |
| realAlloc.SetResult(Replaced) |
| ph.AddRelease(realAlloc) |
| alloc = app.ReplaceAllocation(ph.GetAllocationKey()) |
| assert.Equal(t, alloc, ph, "returned allocation is not the placeholder") |
| assert.Assert(t, resources.IsZero(app.allocatedPlaceholder), "real allocation counted as placeholder") |
| if !resources.Equals(app.allocatedResource, res) { |
| t.Fatalf("real allocation not updated as expected: got %s, expected %s", app.allocatedResource, res) |
| } |
| assert.Equal(t, app.placeholderData[""].Replaced, int64(1)) |
| assert.Equal(t, realAlloc.GetPlaceholderCreateTime(), ph.GetCreateTime(), "real allocation's placeholder create time not updated as expected: got %s, expected %s", realAlloc.GetPlaceholderCreateTime(), ph.GetCreateTime()) |
| assertUserGroupResource(t, getTestUserGroup(), res) |
| |
| // add the placeholder back to the app, the failure test above changed state and removed the ph |
| app.SetState(Running.String()) |
| ph.ClearReleases() |
| app.AddAllocation(ph) |
| app.addPlaceholderDataWithLocking(ph.GetAsk()) |
| assert.Equal(t, app.placeholderData[""].Count, int64(3)) |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| |
| // set multiple real allocations to replace the placeholder |
| realAlloc = newAllocation(appID1, nodeID1, res) |
| realAlloc.SetResult(Replaced) |
| ph.AddRelease(realAlloc) |
| realAllocNoAdd := newAllocation(appID1, nodeID1, res) |
| realAllocNoAdd.SetResult(Replaced) |
| ph.AddRelease(realAlloc) |
| alloc = app.ReplaceAllocation(ph.GetAllocationKey()) |
| assert.Equal(t, alloc, ph, "returned allocation is not the placeholder") |
| assert.Assert(t, resources.IsZero(app.allocatedPlaceholder), "real allocation counted as placeholder") |
| assert.Equal(t, app.placeholderData[""].Replaced, int64(2)) |
| // after the second replace we have 2 real allocations |
| if !resources.Equals(app.allocatedResource, resources.Multiply(res, 2)) { |
| t.Fatalf("real allocation not updated as expected: got %s, expected %s", app.allocatedResource, resources.Multiply(res, 2)) |
| } |
| assert.Equal(t, realAlloc.GetPlaceholderCreateTime(), ph.GetCreateTime(), "real allocation's placeholder create time not updated as expected: got %s, expected %s", realAlloc.GetPlaceholderCreateTime(), ph.GetCreateTime()) |
| if _, ok := app.allocations["not-added"]; ok { |
| t.Fatalf("real allocation added which shouldn't have been added") |
| } |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| } |
| |
| func TestReplaceAllocationTracking(t *testing.T) { |
| setupUGM() |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app := newApplication(appID1, "default", "root.a") |
| app.queue = queue |
| resMap := map[string]string{"memory": "100", "vcores": "10"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "failed to create resource with error") |
| ph1 := newPlaceholderAlloc(appID1, nodeID1, res) |
| ph2 := newPlaceholderAlloc(appID1, nodeID1, res) |
| ph3 := newPlaceholderAlloc(appID1, nodeID1, res) |
| ph1.SetInstanceType(instType1) |
| ph2.SetInstanceType(instType1) |
| ph3.SetInstanceType(instType1) |
| app.AddAllocation(ph1) |
| assert.NilError(t, err, "could not add ask") |
| app.addPlaceholderDataWithLocking(ph1.GetAsk()) |
| assert.Equal(t, true, app.HasPlaceholderAllocation()) |
| app.AddAllocation(ph2) |
| assert.NilError(t, err, "could not add ask") |
| app.addPlaceholderDataWithLocking(ph2.GetAsk()) |
| app.AddAllocation(ph3) |
| assert.NilError(t, err, "could not add ask") |
| app.addPlaceholderDataWithLocking(ph3.GetAsk()) |
| |
| ph1.SetBindTime(time.Now().Add(-10 * time.Second)) |
| ph2.SetBindTime(time.Now().Add(-10 * time.Second)) |
| ph3.SetBindTime(time.Now().Add(-10 * time.Second)) |
| |
| // replace placeholders |
| realAlloc1 := newAllocation(appID1, nodeID1, res) |
| realAlloc1.SetResult(Replaced) |
| ph1.AddRelease(realAlloc1) |
| alloc1 := app.ReplaceAllocation(ph1.GetAllocationKey()) |
| app.RemoveAllocation(ph1.GetAllocationKey(), si.TerminationType_PLACEHOLDER_REPLACED) |
| assert.Equal(t, ph1.GetAllocationKey(), alloc1.GetAllocationKey()) |
| assert.Equal(t, true, app.HasPlaceholderAllocation()) |
| realAlloc2 := newAllocation(appID1, nodeID1, res) |
| realAlloc2.SetResult(Replaced) |
| ph2.AddRelease(realAlloc2) |
| alloc2 := app.ReplaceAllocation(ph2.GetAllocationKey()) |
| app.RemoveAllocation(ph2.GetAllocationKey(), si.TerminationType_PLACEHOLDER_REPLACED) |
| assert.Equal(t, ph2.GetAllocationKey(), alloc2.GetAllocationKey()) |
| assert.Equal(t, true, app.HasPlaceholderAllocation()) |
| realAlloc3 := newAllocation(appID1, nodeID1, res) |
| realAlloc3.SetResult(Replaced) |
| ph3.AddRelease(realAlloc3) |
| alloc3 := app.ReplaceAllocation(ph3.GetAllocationKey()) |
| app.RemoveAllocation(ph3.GetAllocationKey(), si.TerminationType_PLACEHOLDER_REPLACED) |
| assert.Equal(t, ph3.GetAllocationKey(), alloc3.GetAllocationKey()) |
| assert.Equal(t, false, app.HasPlaceholderAllocation()) |
| |
| // check placeholder resource usage |
| appSummary := app.GetApplicationSummary("default") |
| assertPlaceHolderResource(t, appSummary, 3000, 300) |
| } |
| |
| func TestTimeoutPlaceholderSoftStyle(t *testing.T) { |
| runTimeoutPlaceholderTest(t, Resuming.String(), Soft) |
| } |
| |
| func TestTimeoutPlaceholderAllocAsk(t *testing.T) { |
| runTimeoutPlaceholderTest(t, Failing.String(), Hard) |
| } |
| |
| func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulingStyle string) { |
| setupUGM() |
| // create a fake queue |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| |
| originalPhTimeout := defaultPlaceholderTimeout |
| defaultPlaceholderTimeout = 5 * time.Millisecond |
| defer func() { defaultPlaceholderTimeout = originalPhTimeout }() |
| |
| app, testHandler := newApplicationWithHandler(appID1, "default", "root.a") |
| app.gangSchedulingStyle = gangSchedulingStyle |
| assert.Assert(t, app.getPlaceholderTimer() == nil, "Placeholder timer should be nil on create") |
| // fake the queue assignment (needed with ask) |
| app.queue = queue |
| |
| resMap := map[string]string{"memory": "100", "vcores": "10"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "Unexpected error when creating resource from map") |
| // add the placeholder ask to the app |
| tg1 := "tg-1" |
| phAsk := newAllocationAskTG("ask-1", appID1, tg1, res) |
| err = app.AddAllocationAsk(phAsk) |
| assert.NilError(t, err, "Application ask should have been added") |
| assert.Assert(t, app.IsAccepted(), "Application should be in accepted state") |
| |
| // check PlaceHolderData |
| assert.Equal(t, len(app.placeholderData), 1) |
| assert.Equal(t, app.placeholderData[tg1].TaskGroupName, tg1) |
| assert.Equal(t, app.placeholderData[tg1].Count, int64(1)) |
| assert.Equal(t, app.placeholderData[tg1].Replaced, int64(0)) |
| assert.Equal(t, app.placeholderData[tg1].TimedOut, int64(0)) |
| assert.DeepEqual(t, app.placeholderData[tg1].MinResource, res) |
| |
| // add the placeholder to the app |
| ph := newPlaceholderAlloc(appID1, nodeID1, res) |
| app.AddAllocation(ph) |
| assertUserGroupResource(t, getTestUserGroup(), res) |
| assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation") |
| // add a second one to check the filter |
| ph = newPlaceholderAlloc(appID1, nodeID1, res) |
| app.AddAllocation(ph) |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| err = common.WaitFor(10*time.Millisecond, 1*time.Second, func() bool { |
| app.RLock() |
| defer app.RUnlock() |
| return app.placeholderTimer == nil |
| }) |
| assert.Equal(t, app.placeholderData[tg1].TimedOut, app.placeholderData[tg1].Count, "When the app is in an accepted state, timeout should equal to count") |
| assert.NilError(t, err, "Placeholder timeout cleanup did not trigger unexpectedly") |
| assert.Equal(t, app.stateMachine.Current(), expectedState, "Application did not progress into expected state") |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| events := testHandler.GetEvents() |
| var found int |
| for _, event := range events { |
| if allocRelease, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { |
| assert.Equal(t, len(allocRelease.ReleasedAllocations), 2, "two allocations should have been released") |
| found++ |
| } |
| if askRelease, ok := event.(*rmevent.RMReleaseAllocationAskEvent); ok { |
| assert.Equal(t, len(askRelease.ReleasedAllocationAsks), 1, "one allocation ask should have been released") |
| found++ |
| } |
| } |
| // check if the Replaced of PlaceHolderData is 0 |
| assert.Equal(t, app.placeholderData[tg1].Replaced, int64(0)) |
| // Because the Count of PlaceHolderData is only added in AddAllocationAsk, so it is 1 |
| assert.Equal(t, app.placeholderData[tg1].Count, int64(1)) |
| |
| assert.Equal(t, found, 2, "release allocation or ask event not found in list") |
| // asks are completely cleaned up |
| assert.Assert(t, resources.IsZero(app.GetPendingResource()), "pending placeholder resources should be zero") |
| // a released placeholder still holds the resource until release confirmed by the RM |
| assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), resources.Multiply(res, 2)), "Unexpected placeholder resources for the app") |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| |
| log := app.GetStateLog() |
| assert.Equal(t, len(log), 2, "wrong number of app events") |
| assert.Equal(t, log[0].ApplicationState, Accepted.String()) |
| assert.Equal(t, log[1].ApplicationState, expectedState) |
| } |
| |
| func TestTimeoutPlaceholderAllocReleased(t *testing.T) { |
| setupUGM() |
| |
| originalPhTimeout := defaultPlaceholderTimeout |
| defaultPlaceholderTimeout = 100 * time.Millisecond |
| defer func() { defaultPlaceholderTimeout = originalPhTimeout }() |
| |
| app, testHandler := newApplicationWithHandler(appID1, "default", "root.a") |
| assert.Assert(t, app.getPlaceholderTimer() == nil, "Placeholder timer should be nil on create") |
| app.SetState(Accepted.String()) |
| |
| resMap := map[string]string{"memory": "100", "vcores": "10"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "Unexpected error when creating resource from map") |
| // add the placeholders to the app: one released, one still available. |
| ph := newPlaceholderAlloc(appID1, nodeID1, res) |
| ph.SetReleased(true) |
| app.AddAllocation(ph) |
| // add PlaceholderData |
| app.addPlaceholderDataWithLocking(ph.GetAsk()) |
| assert.Equal(t, len(app.placeholderData), 1) |
| assert.Equal(t, app.placeholderData[""].TaskGroupName, "") |
| assert.Equal(t, app.placeholderData[""].Count, int64(1)) |
| assert.Equal(t, app.placeholderData[""].Replaced, int64(0)) |
| assert.Equal(t, app.placeholderData[""].TimedOut, int64(0)) |
| assert.DeepEqual(t, app.placeholderData[""].MinResource, res) |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) |
| |
| assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation") |
| ph = newPlaceholderAlloc(appID1, nodeID1, res) |
| app.AddAllocation(ph) |
| app.addPlaceholderDataWithLocking(ph.GetAsk()) |
| assert.Equal(t, app.placeholderData[""].Count, int64(2)) |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| |
| alloc := newAllocation(appID1, nodeID1, res) |
| app.AddAllocation(alloc) |
| assert.Assert(t, app.IsRunning(), "App should be in running state after the first allocation") |
| err = common.WaitFor(10*time.Millisecond, 1*time.Second, func() bool { |
| return app.getPlaceholderTimer() == nil |
| }) |
| assert.NilError(t, err, "Placeholder timeout cleanup did not trigger unexpectedly") |
| assert.Assert(t, app.IsRunning(), "App should be in running state after the first allocation") |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 3)) |
| // two state updates and 1 release event |
| events := testHandler.GetEvents() |
| var found bool |
| for _, event := range events { |
| if allocRelease, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { |
| assert.Equal(t, len(allocRelease.ReleasedAllocations), 1, "one allocation should have been released") |
| assert.Equal(t, allocRelease.ReleasedAllocations[0].AllocationKey, ph.allocationKey, "wrong placeholder allocation released on timeout") |
| found = true |
| } |
| if _, ok := event.(*rmevent.RMReleaseAllocationAskEvent); ok { |
| t.Fatal("unexpected release allocation ask event found in list of events") |
| } |
| } |
| assert.Assert(t, found, "release allocation event not found in list") |
| assert.Assert(t, resources.Equals(app.GetAllocatedResource(), res), "Unexpected allocated resources for the app") |
| // a released placeholder still holds the resource until release confirmed by the RM |
| assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), resources.Multiply(res, 2)), "Unexpected placeholder resources for the app") |
| assert.Equal(t, app.placeholderData[""].Replaced, int64(0)) |
| assert.Equal(t, app.placeholderData[""].TimedOut, int64(1)) |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 3)) |
| } |
| |
| func TestTimeoutPlaceholderCompleting(t *testing.T) { |
| setupUGM() |
| phTimeout := common.ConvertSITimeout(5) |
| app, testHandler := newApplicationWithPlaceholderTimeout(appID1, "default", "root.a", 5) |
| assert.Assert(t, app.getPlaceholderTimer() == nil, "Placeholder timer should be nil on create") |
| assert.Equal(t, app.execTimeout, phTimeout, "placeholder timeout not initialised correctly") |
| app.SetState(Accepted.String()) |
| |
| resMap := map[string]string{"memory": "100", "vcores": "10"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "Unexpected error when creating resource from map") |
| // add the placeholder to the app |
| ph := newPlaceholderAlloc(appID1, nodeID1, res) |
| app.AddAllocation(ph) |
| assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation") |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) |
| // add a real allocation as well |
| alloc := newAllocation(appID1, nodeID1, res) |
| app.AddAllocation(alloc) |
| // move on to running |
| app.SetState(Running.String()) |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| // remove allocation to trigger state change |
| app.RemoveAllocation(alloc.GetAllocationKey(), si.TerminationType_UNKNOWN_TERMINATION_TYPE) |
| assert.Assert(t, app.IsCompleting(), "App should be in completing state all allocs have been removed") |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) |
| // make sure the placeholders time out |
| err = common.WaitFor(10*time.Millisecond, 1*time.Second, func() bool { |
| return app.getPlaceholderTimer() == nil |
| }) |
| assert.NilError(t, err, "Placeholder timer did not time out as expected") |
| events := testHandler.GetEvents() |
| var found bool |
| for _, event := range events { |
| if allocRelease, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { |
| assert.Equal(t, len(allocRelease.ReleasedAllocations), 1, "one allocation should have been released") |
| found = true |
| } |
| if _, ok := event.(*rmevent.RMReleaseAllocationAskEvent); ok { |
| t.Fatal("unexpected release allocation ask event found in list of events") |
| } |
| } |
| assert.Assert(t, found, "release allocation event not found in list") |
| assert.Assert(t, app.IsCompleting(), "App should still be in completing state") |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) |
| } |
| |
| func TestAppTimersAfterAppRemoval(t *testing.T) { |
| setupUGM() |
| phTimeout := common.ConvertSITimeout(50) |
| app, _ := newApplicationWithPlaceholderTimeout(appID1, "default", "root.a", 50) |
| assert.Assert(t, app.getPlaceholderTimer() == nil, "Placeholder timer should be nil on create") |
| assert.Equal(t, app.execTimeout, phTimeout, "placeholder timeout not initialised correctly") |
| app.SetState(Accepted.String()) |
| |
| resMap := map[string]string{"memory": "100", "vcores": "10"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "Unexpected error when creating resource from map") |
| // add the placeholder to the app |
| ph := newPlaceholderAlloc(appID1, nodeID1, res) |
| app.AddAllocation(ph) |
| assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation") |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) |
| // add a real allocation as well |
| alloc := newAllocation(appID1, nodeID1, res) |
| app.AddAllocation(alloc) |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| // move on to running |
| app.SetState(Running.String()) |
| app.RemoveAllAllocations() |
| assert.Assert(t, app.IsCompleting(), "App should be in completing state all allocs have been removed") |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| if app.getPlaceholderTimer() != nil { |
| t.Fatalf("Placeholder timer has not be cleared after app removal as expected, %v", app.getPlaceholderTimer()) |
| } |
| if app.stateTimer != nil { |
| t.Fatalf("State timer has not be cleared after app removal as expected, %v", app.stateTimer) |
| } |
| } |
| |
| func TestIncAndDecUserResourceUsage(t *testing.T) { |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| app := newApplication(appID1, "default", "root.unknown") |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| app.incUserResourceUsage(nil) |
| assertUserGroupResource(t, getTestUserGroup(), nil) |
| app.incUserResourceUsage(res) |
| assertUserGroupResource(t, getTestUserGroup(), res) |
| app.incUserResourceUsage(res) |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| app.decUserResourceUsage(nil, false) |
| assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| app.decUserResourceUsage(res, false) |
| assertUserGroupResource(t, getTestUserGroup(), res) |
| app.decUserResourceUsage(res, false) |
| assertUserGroupResource(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0})) |
| } |
| |
| func TestIncAndDecUserResourceUsageInSameGroup(t *testing.T) { |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| testgroups := []string{"testgroup"} |
| app := newApplicationWithUserGroup(appID1, "default", "root.unknown", "testuser", testgroups) |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| app2 := newApplicationWithUserGroup(appID2, "default", "root.unknown", "testuser2", testgroups) |
| if app2 == nil || app2.ApplicationID != appID2 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| app2.queue = queue |
| |
| // Increase testuser |
| app.incUserResourceUsage(res) |
| assertUserResourcesAndGroupResources(t, getUserGroup("testuser", testgroups), res, nil, 0) |
| assertUserResourcesAndGroupResources(t, getUserGroup("testuser2", testgroups), nil, nil, 0) |
| |
| // Increase both testuser and testuser2 with the same group testgroup |
| app.incUserResourceUsage(res) |
| app2.incUserResourceUsage(resources.Multiply(res, 2)) |
| assertUserResourcesAndGroupResources(t, getUserGroup("testuser", testgroups), resources.Multiply(res, 2), nil, 0) |
| assertUserResourcesAndGroupResources(t, getUserGroup("testuser2", testgroups), resources.Multiply(res, 2), nil, 0) |
| |
| // Increase nil |
| app.decUserResourceUsage(nil, false) |
| assertUserResourcesAndGroupResources(t, getUserGroup("testuser", testgroups), resources.Multiply(res, 2), nil, 0) |
| assertUserResourcesAndGroupResources(t, getUserGroup("testuser2", testgroups), resources.Multiply(res, 2), nil, 0) |
| |
| // Decrease testuser |
| app.decUserResourceUsage(res, false) |
| assertUserResourcesAndGroupResources(t, getUserGroup("testuser", testgroups), res, nil, 0) |
| assertUserResourcesAndGroupResources(t, getUserGroup("testuser2", testgroups), resources.Multiply(res, 2), nil, 0) |
| |
| // Decrease testuser2 |
| app2.decUserResourceUsage(res, false) |
| assertUserResourcesAndGroupResources(t, getUserGroup("testuser", testgroups), res, nil, 0) |
| assertUserResourcesAndGroupResources(t, getUserGroup("testuser2", testgroups), res, nil, 0) |
| |
| // Decrease testuser and testuser2 to 0 |
| app.decUserResourceUsage(res, false) |
| app2.decUserResourceUsage(res, false) |
| assertUserResourcesAndGroupResources(t, getUserGroup("testuser", testgroups), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), nil, 0) |
| } |
| |
| func TestGetAllRequests(t *testing.T) { |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| ask := newAllocationAsk(aKey, appID1, res) |
| app := newApplication(appID1, "default", "root.unknown") |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| assert.Assert(t, len(app.getAllRequestsInternal()) == 0, "App should have no requests yet") |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "No error expected when adding an ask") |
| assert.Assert(t, len(app.getAllRequestsInternal()) == 1, "App should have only one request") |
| assert.Equal(t, app.getAllRequestsInternal()[0], ask, "Unexpected request found in the app") |
| } |
| |
| func TestGetQueueNameAfterUnsetQueue(t *testing.T) { |
| app := newApplication(appID1, "default", "root.unknown") |
| assert.Equal(t, app.GetQueuePath(), "root.unknown") |
| |
| // the queue is reset to nil but GetQueuePath should work well |
| app.UnSetQueue() |
| assert.Assert(t, app.queue == nil) |
| assert.Equal(t, app.GetQueuePath(), "root.unknown") |
| } |
| |
| func TestFinishedTime(t *testing.T) { |
| app := newApplication(appID1, "default", "root.unknown") |
| assert.Assert(t, app.finishedTime.IsZero()) |
| assert.Assert(t, app.FinishedTime().IsZero()) |
| |
| // Don't need sleep here, anytime finished, we will set finishedTime for now |
| app.UnSetQueue() |
| assert.Assert(t, !app.finishedTime.IsZero()) |
| assert.Assert(t, !app.FinishedTime().IsZero()) |
| |
| // when app is rejected, finishedTime is rejectedTime |
| app1 := newApplication(appID1, "default", "root.unknown") |
| app1.SetState(Rejected.String()) |
| assert.Assert(t, !app.finishedTime.IsZero()) |
| assert.Assert(t, !app.FinishedTime().IsZero()) |
| } |
| |
| func TestCanReplace(t *testing.T) { |
| app := newApplication(appID1, "default", "root.unknown") |
| resMap := map[string]string{"memory": "100", "vcores": "10"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "Unexpected error when creating resource from map") |
| |
| tg1 := "available" |
| tests := []struct { |
| name string |
| ask *AllocationAsk |
| want bool |
| }{ |
| {"nil", nil, false}, |
| {"placeholder", newAllocationAskTG(aKey, appID1, tg1, res), false}, |
| {"no TG", newAllocationAsk(aKey, appID1, res), false}, |
| {"no placeholder data", newAllocationAskAll(aKey, appID1, tg1, res, false, 0), false}, |
| } |
| for _, tt := range tests { |
| t.Run(tt.name, func(t *testing.T) { |
| assert.Equal(t, tt.want, app.canReplace(tt.ask), "unexpected replacement") |
| }) |
| } |
| // add the placeholder data |
| // available tg has one replacement open |
| app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg1, res)) |
| // unavailable tg has NO replacement open (replaced) |
| tg2 := "unavailable" |
| app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg2, res)) |
| app.placeholderData[tg2].Replaced++ |
| // unavailable tg has NO replacement open (timedout) |
| tg3 := "timedout" |
| app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg3, res)) |
| app.placeholderData[tg3].TimedOut++ |
| tests = []struct { |
| name string |
| ask *AllocationAsk |
| want bool |
| }{ |
| {"no TG", newAllocationAsk(aKey, appID1, res), false}, |
| {"TG mismatch", newAllocationAskAll(aKey, appID1, "unknown", res, false, 0), false}, |
| {"TG placeholder used", newAllocationAskAll(aKey, appID1, tg2, res, false, 0), false}, |
| {"TG placeholder timed out", newAllocationAskAll(aKey, appID1, tg3, res, false, 0), false}, |
| {"TG placeholder available", newAllocationAskAll(aKey, appID1, tg1, res, false, 0), true}, |
| } |
| for _, tt := range tests { |
| t.Run(tt.name, func(t *testing.T) { |
| assert.Equal(t, tt.want, app.canReplace(tt.ask), "unexpected replacement") |
| }) |
| } |
| } |
| |
| func TestTryAllocateNoRequests(t *testing.T) { |
| node := newNode("node1", map[string]resources.Quantity{"first": 5}) |
| nodeMap := map[string]*Node{"node1": node} |
| iterator := getNodeIteratorFn(node) |
| getNode := func(nodeID string) *Node { |
| return nodeMap[nodeID] |
| } |
| |
| app := newApplication(appID1, "default", "root.unknown") |
| preemptionAttemptsRemaining := 0 |
| alloc := app.tryAllocate(node.GetAvailableResource(), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) |
| assert.Check(t, alloc == nil, "unexpected alloc") |
| } |
| |
| func TestTryAllocateFit(t *testing.T) { |
| node := newNode("node1", map[string]resources.Quantity{"first": 5}) |
| nodeMap := map[string]*Node{"node1": node} |
| iterator := getNodeIteratorFn(node) |
| getNode := func(nodeID string) *Node { |
| return nodeMap[nodeID] |
| } |
| |
| rootQ, err := createRootQueue(map[string]string{"first": "5"}) |
| assert.NilError(t, err) |
| childQ, err := createManagedQueue(rootQ, "child", false, map[string]string{"first": "5"}) |
| assert.NilError(t, err) |
| |
| app := newApplication(appID1, "default", "root.child") |
| app.SetQueue(childQ) |
| childQ.applications[appID1] = app |
| ask := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err) |
| |
| preemptionAttemptsRemaining := 0 |
| alloc := app.tryAllocate(node.GetAvailableResource(), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) |
| assert.Assert(t, alloc != nil, "alloc expected") |
| assert.Equal(t, "node1", alloc.GetNodeID(), "wrong node") |
| } |
| |
| func TestTryAllocatePreemptQueue(t *testing.T) { |
| node := newNode("node1", map[string]resources.Quantity{"first": 20}) |
| nodeMap := map[string]*Node{"node1": node} |
| iterator := getNodeIteratorFn(node) |
| getNode := func(nodeID string) *Node { |
| return nodeMap[nodeID] |
| } |
| |
| rootQ, err := createRootQueue(map[string]string{"first": "20"}) |
| assert.NilError(t, err) |
| parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, map[string]string{"first": "20"}, map[string]string{"first": "10"}) |
| assert.NilError(t, err) |
| childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, nil, map[string]string{"first": "5"}) |
| assert.NilError(t, err) |
| childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, nil, map[string]string{"first": "5"}) |
| assert.NilError(t, err) |
| |
| app1 := newApplication(appID1, "default", "root.parent.child1") |
| app1.SetQueue(childQ1) |
| childQ1.applications[appID1] = app1 |
| ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})) |
| err = app1.AddAllocationAsk(ask1) |
| assert.NilError(t, err) |
| ask2 := newAllocationAsk("alloc2", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})) |
| err = app1.AddAllocationAsk(ask2) |
| assert.NilError(t, err) |
| |
| app2 := newApplication(appID2, "default", "root.parent.child2") |
| app2.SetQueue(childQ2) |
| childQ2.applications[appID2] = app2 |
| ask3 := newAllocationAsk("alloc3", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})) |
| ask3.allowPreemptOther = true |
| err = app2.AddAllocationAsk(ask3) |
| assert.NilError(t, err) |
| |
| preemptionAttemptsRemaining := 10 |
| |
| alloc1 := app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) |
| assert.Assert(t, alloc1 != nil, "alloc1 expected") |
| alloc2 := app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) |
| assert.Assert(t, alloc2 != nil, "alloc2 expected") |
| |
| // on first attempt, not enough time has passed |
| alloc3 := app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) |
| assert.Assert(t, alloc3 == nil, "alloc3 not expected") |
| assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been preempted") |
| |
| // pass the time and try again |
| ask3.createTime = ask3.createTime.Add(-30 * time.Second) |
| alloc3 = app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) |
| assert.Assert(t, alloc3 != nil && alloc3.result == Reserved, "alloc3 should be a reservation") |
| assert.Assert(t, alloc2.IsPreempted(), "alloc2 should have been preempted") |
| } |
| |
| func TestTryAllocatePreemptNode(t *testing.T) { |
| node1 := newNode("node1", map[string]resources.Quantity{"first": 20}) |
| node2 := newNode("node2", map[string]resources.Quantity{"first": 20}) |
| nodeMap := map[string]*Node{"node1": node1, "node2": node2} |
| iterator := getNodeIteratorFn(node1, node2) |
| getNode := func(nodeID string) *Node { |
| return nodeMap[nodeID] |
| } |
| |
| rootQ, err := createRootQueue(map[string]string{"first": "40"}) |
| assert.NilError(t, err) |
| parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true, map[string]string{"first": "20"}, map[string]string{"first": "10"}) |
| assert.NilError(t, err) |
| unlimitedQ, err := createManagedQueueGuaranteed(rootQ, "unlimited", false, nil, nil) |
| assert.NilError(t, err) |
| childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false, nil, map[string]string{"first": "5"}) |
| assert.NilError(t, err) |
| childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false, nil, map[string]string{"first": "5"}) |
| assert.NilError(t, err) |
| |
| app0 := newApplication(appID0, "default", "root.unlimited") |
| app0.SetQueue(unlimitedQ) |
| unlimitedQ.applications[appID0] = app0 |
| ask00 := newAllocationAsk("alloc0-0", appID0, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11})) |
| err = app0.AddAllocationAsk(ask00) |
| assert.NilError(t, err) |
| ask01 := newAllocationAsk("alloc0-1", appID0, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11})) |
| err = app0.AddAllocationAsk(ask01) |
| assert.NilError(t, err) |
| |
| app1 := newApplication(appID1, "default", "root.parent.child1") |
| app1.SetQueue(childQ1) |
| childQ1.applications[appID1] = app1 |
| ask1 := newAllocationAsk("alloc1", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})) |
| err = app1.AddAllocationAsk(ask1) |
| assert.NilError(t, err) |
| ask2 := newAllocationAsk("alloc2", appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})) |
| err = app1.AddAllocationAsk(ask2) |
| assert.NilError(t, err) |
| |
| app2 := newApplication(appID2, "default", "root.parent.child2") |
| app2.SetQueue(childQ2) |
| childQ2.applications[appID2] = app2 |
| ask3 := newAllocationAsk("alloc3", appID2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})) |
| ask3.allowPreemptOther = true |
| err = app2.AddAllocationAsk(ask3) |
| assert.NilError(t, err) |
| |
| preemptionAttemptsRemaining := 10 |
| |
| // consume capacity with 'unlimited' app |
| alloc00 := app0.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 40}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) |
| assert.Assert(t, alloc00 != nil, "alloc00 expected") |
| alloc01 := app0.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 39}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) |
| assert.Assert(t, alloc01 != nil, "alloc01 expected") |
| |
| // consume remainder of space but not quota |
| alloc1 := app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 28}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) |
| assert.Assert(t, alloc1 != nil, "alloc1 expected") |
| alloc2 := app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 23}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) |
| assert.Assert(t, alloc2 != nil, "alloc2 expected") |
| |
| // on first attempt, should see a reservation since we're after the reservation timeout |
| ask3.createTime = ask3.createTime.Add(-10 * time.Second) |
| alloc3 := app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) |
| assert.Assert(t, alloc3 != nil, "alloc3 expected") |
| assert.Equal(t, "node1", alloc3.GetNodeID(), "wrong node assignment") |
| assert.Equal(t, Reserved, alloc3.GetResult(), "expected reservation") |
| assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been preempted") |
| err = node1.Reserve(app2, ask3) |
| assert.NilError(t, err) |
| |
| // pass the time and try again |
| ask3.createTime = ask3.createTime.Add(-30 * time.Second) |
| alloc3 = app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) |
| assert.Assert(t, alloc3 != nil, "alloc3 expected") |
| assert.Assert(t, alloc1.IsPreempted(), "alloc1 should have been preempted") |
| } |
| |
| func TestMaxAskPriority(t *testing.T) { |
| app := newApplication(appID1, "default", "root.unknown") |
| if app == nil || app.ApplicationID != appID1 { |
| t.Fatalf("app create failed which should not have %v", app) |
| } |
| |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| |
| // initial state |
| assert.Equal(t, app.GetAskMaxPriority(), configs.MinPriority, "wrong default priority") |
| |
| // p=10 added |
| ask1 := newAllocationAskPriority("prio-10", appID1, res, 10) |
| err = app.AddAllocationAsk(ask1) |
| assert.NilError(t, err, "ask should have been updated on app") |
| assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority after adding p=10") |
| |
| // p=5 added |
| ask2 := newAllocationAskPriority("prio-5", appID1, res, 5) |
| err = app.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "ask should have been added to app") |
| assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority after adding p=5") |
| |
| // p=15 added |
| ask3 := newAllocationAskPriority("prio-15", appID1, res, 15) |
| err = app.AddAllocationAsk(ask3) |
| assert.NilError(t, err, "ask should have been added to app") |
| assert.Equal(t, app.GetAskMaxPriority(), int32(15), "wrong priority after adding p=15") |
| |
| // p=10 removed |
| app.RemoveAllocationAsk(ask1.GetAllocationKey()) |
| assert.Equal(t, app.GetAskMaxPriority(), int32(15), "wrong priority after removing p=10") |
| |
| // p=15 removed |
| app.RemoveAllocationAsk(ask3.GetAllocationKey()) |
| assert.Equal(t, app.GetAskMaxPriority(), int32(5), "wrong priority after removing p=15") |
| |
| // re-add removed asks |
| err = app.AddAllocationAsk(ask1) |
| assert.NilError(t, err, "ask should have been added to app") |
| err = app.AddAllocationAsk(ask3) |
| assert.NilError(t, err, "ask should have been added to app") |
| |
| assert.Equal(t, app.GetAskMaxPriority(), int32(15), "wrong priority after re-adding asks") |
| |
| // update to allocated for p=15 |
| _, err = app.AllocateAsk(ask3.GetAllocationKey()) |
| assert.NilError(t, err, "ask should have been updated") |
| assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority after updating p=15 to allocated") |
| |
| // update to allocated for p=5 |
| _, err = app.AllocateAsk(ask2.GetAllocationKey()) |
| assert.NilError(t, err, "ask should have been updated") |
| assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority after updating p=5 to allocated") |
| |
| // update to unallocated for p=5 |
| _, err = app.DeallocateAsk(ask2.GetAllocationKey()) |
| assert.NilError(t, err, "ask should have been updated") |
| assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority after updating p=5 to unallocated") |
| |
| // update to unallocated for p=15 |
| _, err = app.DeallocateAsk(ask3.GetAllocationKey()) |
| assert.NilError(t, err, "ask should have been updated") |
| assert.Equal(t, app.GetAskMaxPriority(), int32(15), "wrong priority after updating p=15 to unallocated") |
| } |
| |
| func TestAskEvents(t *testing.T) { |
| app := newApplication(appID1, "default", "root.default") |
| // Create event system after new application to avoid new app event. |
| events.Init() |
| eventSystem := events.GetEventSystem().(*events.EventSystemImpl) //nolint:errcheck |
| eventSystem.StartServiceWithPublisher(false) |
| app.disableStateChangeEvents() |
| app.resetAppEvents() |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) |
| ask := newAllocationAsk(aKey, appID1, res) |
| |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err) |
| app.RemoveAllocationAsk(ask.allocationKey) |
| noEvents := uint64(0) |
| err = common.WaitFor(10*time.Millisecond, time.Second, func() bool { |
| noEvents = eventSystem.Store.CountStoredEvents() |
| return noEvents == 3 |
| }) |
| assert.NilError(t, err, "expected 3 events, got %d", noEvents) |
| records := eventSystem.Store.CollectEvents() |
| assert.Equal(t, 3, len(records), "number of events") |
| assert.Equal(t, si.EventRecord_APP, records[1].Type) |
| assert.Equal(t, si.EventRecord_ADD, records[1].EventChangeType) |
| assert.Equal(t, si.EventRecord_APP_REQUEST, records[1].EventChangeDetail) |
| assert.Equal(t, si.EventRecord_APP, records[2].Type) |
| assert.Equal(t, si.EventRecord_REMOVE, records[2].EventChangeType) |
| assert.Equal(t, si.EventRecord_REQUEST_CANCEL, records[2].EventChangeDetail) |
| |
| ask2 := newAllocationAsk("alloc-2", appID1, res) |
| ask3 := newAllocationAsk("alloc-3", appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err) |
| err = app.AddAllocationAsk(ask2) |
| assert.NilError(t, err) |
| err = app.AddAllocationAsk(ask3) |
| assert.NilError(t, err) |
| app.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT) |
| err = common.WaitFor(10*time.Millisecond, time.Second, func() bool { |
| noEvents = eventSystem.Store.CountStoredEvents() |
| return noEvents == 6 |
| }) |
| assert.NilError(t, err, "expected 6 events, got %d", noEvents) |
| records = eventSystem.Store.CollectEvents() |
| refIdsRemoved := make(map[string]int) // order can change due to map iteration |
| assert.Equal(t, 6, len(records), "number of events") |
| assert.Equal(t, si.EventRecord_APP, records[3].Type) |
| assert.Equal(t, si.EventRecord_REMOVE, records[3].EventChangeType) |
| assert.Equal(t, si.EventRecord_REQUEST_TIMEOUT, records[3].EventChangeDetail) |
| refIdsRemoved[records[3].ReferenceID]++ |
| assert.Equal(t, si.EventRecord_APP, records[4].Type) |
| assert.Equal(t, si.EventRecord_REMOVE, records[4].EventChangeType) |
| assert.Equal(t, si.EventRecord_REQUEST_TIMEOUT, records[4].EventChangeDetail) |
| refIdsRemoved[records[4].ReferenceID]++ |
| assert.Equal(t, si.EventRecord_APP, records[5].Type) |
| assert.Equal(t, si.EventRecord_REMOVE, records[5].EventChangeType) |
| assert.Equal(t, si.EventRecord_REQUEST_TIMEOUT, records[5].EventChangeDetail) |
| refIdsRemoved[records[5].ReferenceID]++ |
| assert.Equal(t, 1, refIdsRemoved["alloc-1"]) |
| assert.Equal(t, 1, refIdsRemoved["alloc-2"]) |
| assert.Equal(t, 1, refIdsRemoved["alloc-3"]) |
| } |
| |
| func TestAllocationEvents(t *testing.T) { //nolint:funlen |
| app := newApplication(appID1, "default", "root.default") |
| // Create event system after new application to avoid new app event. |
| events.Init() |
| eventSystem := events.GetEventSystem().(*events.EventSystemImpl) //nolint:errcheck |
| eventSystem.StartServiceWithPublisher(false) |
| app.disableStateChangeEvents() |
| app.resetAppEvents() |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| |
| resMap := map[string]string{"memory": "100", "vcores": "10"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "failed to create resource with error") |
| alloc1 := newAllocation(appID1, nodeID1, res) |
| alloc2 := newAllocation(appID1, nodeID1, res) |
| |
| // add + remove |
| app.AddAllocation(alloc1) |
| app.AddAllocation(alloc2) |
| app.RemoveAllocation(alloc1.GetAllocationKey(), si.TerminationType_STOPPED_BY_RM) |
| app.RemoveAllocation(alloc2.GetAllocationKey(), si.TerminationType_PLACEHOLDER_REPLACED) |
| noEvents := uint64(0) |
| err = common.WaitFor(10*time.Millisecond, time.Second, func() bool { |
| noEvents = eventSystem.Store.CountStoredEvents() |
| return noEvents == 5 |
| }) |
| assert.NilError(t, err, "expected 5 events, got %d", noEvents) |
| records := eventSystem.Store.CollectEvents() |
| |
| assert.Equal(t, 5, len(records), "number of events") |
| assert.Equal(t, si.EventRecord_APP, records[1].Type) |
| assert.Equal(t, si.EventRecord_ADD, records[1].EventChangeType) |
| assert.Equal(t, si.EventRecord_APP_ALLOC, records[1].EventChangeDetail) |
| assert.Equal(t, alloc1.GetAllocationKey(), records[1].ReferenceID) |
| assert.Equal(t, "app-1", records[1].ObjectID) |
| assert.Equal(t, si.EventRecord_APP, records[2].Type) |
| assert.Equal(t, si.EventRecord_ADD, records[2].EventChangeType) |
| assert.Equal(t, si.EventRecord_APP_ALLOC, records[2].EventChangeDetail) |
| assert.Equal(t, alloc2.GetAllocationKey(), records[2].ReferenceID) |
| assert.Equal(t, "app-1", records[2].ObjectID) |
| assert.Equal(t, si.EventRecord_APP, records[3].Type) |
| assert.Equal(t, si.EventRecord_REMOVE, records[3].EventChangeType) |
| assert.Equal(t, si.EventRecord_ALLOC_CANCEL, records[3].EventChangeDetail) |
| assert.Equal(t, alloc1.GetAllocationKey(), records[3].ReferenceID) |
| assert.Equal(t, "app-1", records[3].ObjectID) |
| assert.Equal(t, si.EventRecord_APP, records[4].Type) |
| assert.Equal(t, si.EventRecord_REMOVE, records[4].EventChangeType) |
| assert.Equal(t, si.EventRecord_ALLOC_REPLACED, records[4].EventChangeDetail) |
| assert.Equal(t, alloc2.GetAllocationKey(), records[4].ReferenceID) |
| assert.Equal(t, "app-1", records[4].ObjectID) |
| |
| // add + replace |
| alloc1.placeholder = true |
| app.AddAllocation(alloc1) |
| app.ReplaceAllocation(alloc1.GetAllocationKey()) |
| noEvents = 0 |
| err = common.WaitFor(10*time.Millisecond, time.Second, func() bool { |
| noEvents = eventSystem.Store.CountStoredEvents() |
| return noEvents == 2 |
| }) |
| assert.NilError(t, err, "expected 2 events, got %d", noEvents) |
| records = eventSystem.Store.CollectEvents() |
| assert.Equal(t, 2, len(records), "number of events") |
| assert.Equal(t, si.EventRecord_APP, records[0].Type) |
| assert.Equal(t, si.EventRecord_ADD, records[0].EventChangeType) |
| assert.Equal(t, si.EventRecord_APP_ALLOC, records[0].EventChangeDetail) |
| assert.Equal(t, alloc1.GetAllocationKey(), records[0].ReferenceID) |
| assert.Equal(t, "app-1", records[0].ObjectID) |
| assert.Equal(t, si.EventRecord_APP, records[1].Type) |
| assert.Equal(t, si.EventRecord_REMOVE, records[1].EventChangeType) |
| assert.Equal(t, si.EventRecord_ALLOC_REPLACED, records[1].EventChangeDetail) |
| assert.Equal(t, alloc1.GetAllocationKey(), records[0].ReferenceID) |
| assert.Equal(t, "app-1", records[0].ObjectID) |
| |
| // add + remove all |
| app.AddAllocation(alloc1) |
| app.AddAllocation(alloc2) |
| app.RemoveAllAllocations() |
| err = common.WaitFor(10*time.Millisecond, time.Second, func() bool { |
| noEvents = eventSystem.Store.CountStoredEvents() |
| return noEvents == 4 |
| }) |
| assert.NilError(t, err, "expected 4 events, got %d", noEvents) |
| records = eventSystem.Store.CollectEvents() |
| refIdsRemoved := make(map[string]int) // order can change due to map iteration |
| assert.Equal(t, 4, len(records), "number of events") |
| assert.Equal(t, si.EventRecord_APP, records[0].Type) |
| assert.Equal(t, si.EventRecord_ADD, records[0].EventChangeType) |
| assert.Equal(t, si.EventRecord_APP_ALLOC, records[0].EventChangeDetail) |
| assert.Equal(t, alloc1.GetAllocationKey(), records[0].ReferenceID) |
| assert.Equal(t, "app-1", records[0].ObjectID) |
| assert.Equal(t, si.EventRecord_APP, records[1].Type) |
| assert.Equal(t, si.EventRecord_ADD, records[1].EventChangeType) |
| assert.Equal(t, si.EventRecord_APP_ALLOC, records[1].EventChangeDetail) |
| assert.Equal(t, alloc2.GetAllocationKey(), records[1].ReferenceID) |
| assert.Equal(t, "app-1", records[1].ObjectID) |
| assert.Equal(t, si.EventRecord_APP, records[2].Type) |
| assert.Equal(t, si.EventRecord_REMOVE, records[2].EventChangeType) |
| assert.Equal(t, si.EventRecord_ALLOC_CANCEL, records[2].EventChangeDetail) |
| refIdsRemoved[records[2].ReferenceID]++ |
| assert.Equal(t, "app-1", records[2].ObjectID) |
| assert.Equal(t, si.EventRecord_APP, records[3].Type) |
| assert.Equal(t, si.EventRecord_REMOVE, records[3].EventChangeType) |
| assert.Equal(t, si.EventRecord_ALLOC_CANCEL, records[3].EventChangeDetail) |
| refIdsRemoved[records[3].ReferenceID]++ |
| assert.Equal(t, "app-1", records[3].ObjectID) |
| assert.Equal(t, 1, refIdsRemoved[alloc1.GetAllocationKey()]) |
| assert.Equal(t, 1, refIdsRemoved[alloc2.GetAllocationKey()]) |
| } |
| |
| func TestPlaceholderLargerEvent(t *testing.T) { |
| resMap := map[string]string{"memory": "100", "vcores": "10"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "failed to create resource with error") |
| smallerResMap := map[string]string{"memory": "50", "vcores": "10"} |
| smallerRes, err := resources.NewResourceFromConf(smallerResMap) |
| assert.NilError(t, err, "failed to create resource with error") |
| |
| app := newApplication(appID1, "default", "root.default") |
| // Create event system after new application to avoid new application event. |
| events.Init() |
| eventSystem := events.GetEventSystem().(*events.EventSystemImpl) //nolint:errcheck |
| eventSystem.StartServiceWithPublisher(false) |
| app.disableStateChangeEvents() |
| app.resetAppEvents() |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| // smallerRes < res in the same task group, so delta will be -50 memory |
| alloc1 := newAllocation(appID1, nodeID1, smallerRes) |
| alloc1.placeholder = true |
| alloc1.taskGroupName = "testGroup" |
| app.AddAllocation(alloc1) |
| ask := newAllocationAsk("alloc-0", "app-1", res) |
| ask.taskGroupName = "testGroup" |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err) |
| |
| app.tryPlaceholderAllocate(func() NodeIterator { |
| return nil |
| }, func(s string) *Node { |
| return nil |
| }) |
| |
| noEvents := uint64(0) |
| err = common.WaitFor(10*time.Millisecond, time.Second, func() bool { |
| noEvents = eventSystem.Store.CountStoredEvents() |
| return noEvents == 4 |
| }) |
| assert.NilError(t, err, "expected 4 events, got %d", noEvents) |
| records := eventSystem.Store.CollectEvents() |
| assert.Equal(t, si.EventRecord_REQUEST, records[3].Type) |
| assert.Equal(t, si.EventRecord_NONE, records[3].EventChangeType) |
| assert.Equal(t, si.EventRecord_DETAILS_NONE, records[3].EventChangeDetail) |
| assert.Equal(t, "app-1", records[3].ReferenceID) |
| } |
| |
| func TestRequestDoesNotFitQueueEvents(t *testing.T) { |
| res, err := resources.NewResourceFromConf(map[string]string{"memory": "100", "vcores": "10"}) |
| assert.NilError(t, err) |
| headroom, err := resources.NewResourceFromConf(map[string]string{"memory": "0", "vcores": "0"}) |
| assert.NilError(t, err) |
| ask := newAllocationAsk("alloc-0", "app-1", res) |
| app := newApplication(appID1, "default", "root.default") |
| eventSystem := mock.NewEventSystem() |
| ask.askEvents = newAskEvents(ask, eventSystem) |
| app.disableStateChangeEvents() |
| app.resetAppEvents() |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| sr := sortedRequests{} |
| sr.insert(ask) |
| app.sortedRequests = sr |
| attempts := 0 |
| |
| // try to allocate |
| app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) |
| assert.Equal(t, 1, len(eventSystem.Events)) |
| event := eventSystem.Events[0] |
| assert.Equal(t, si.EventRecord_REQUEST, event.Type) |
| assert.Equal(t, si.EventRecord_NONE, event.EventChangeType) |
| assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail) |
| assert.Equal(t, "app-1", event.ReferenceID) |
| assert.Equal(t, "alloc-0", event.ObjectID) |
| assert.Equal(t, "Request 'alloc-0' does not fit in queue 'root.default' (requested map[memory:100 vcores:10], available map[memory:0 vcores:0])", event.Message) |
| |
| // second attempt - no new event |
| app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) |
| assert.Equal(t, 1, len(eventSystem.Events)) |
| |
| // third attempt with enough headroom - new event |
| eventSystem.Reset() |
| headroom, err = resources.NewResourceFromConf(map[string]string{"memory": "1000", "vcores": "1000"}) |
| assert.NilError(t, err) |
| app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) |
| assert.Equal(t, 1, len(eventSystem.Events)) |
| event = eventSystem.Events[0] |
| assert.Equal(t, si.EventRecord_REQUEST, event.Type) |
| assert.Equal(t, si.EventRecord_NONE, event.EventChangeType) |
| assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail) |
| assert.Equal(t, "app-1", event.ReferenceID) |
| assert.Equal(t, "alloc-0", event.ObjectID) |
| assert.Equal(t, "Request 'alloc-0' has become schedulable in queue 'root.default'", event.Message) |
| } |
| |
| func TestRequestDoesNotFitUserQuotaQueueEvents(t *testing.T) { |
| setupUGM() |
| // create config with resource limits for "testuser" |
| conf := configs.QueueConfig{ |
| Name: "root", |
| Parent: true, |
| SubmitACL: "*", |
| Limits: []configs.Limit{ |
| { |
| Limit: "leaf queue limit", |
| Users: []string{ |
| "testuser", |
| }, |
| MaxResources: map[string]string{ |
| "memory": "1", |
| "vcores": "1", |
| }, |
| }, |
| }, |
| } |
| err := ugm.GetUserManager().UpdateConfig(conf, "root") |
| assert.NilError(t, err) |
| |
| res, err := resources.NewResourceFromConf(map[string]string{"memory": "100", "vcores": "10"}) |
| assert.NilError(t, err) |
| headroom, err := resources.NewResourceFromConf(map[string]string{"memory": "1000", "vcores": "1000"}) |
| assert.NilError(t, err) |
| ask := newAllocationAsk("alloc-0", "app-1", res) |
| app := newApplication(appID1, "default", "root") |
| eventSystem := mock.NewEventSystem() |
| ask.askEvents = newAskEvents(ask, eventSystem) |
| app.disableStateChangeEvents() |
| app.resetAppEvents() |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| sr := sortedRequests{} |
| sr.insert(ask) |
| app.sortedRequests = sr |
| attempts := 0 |
| |
| // try to allocate |
| app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) |
| assert.Equal(t, 1, len(eventSystem.Events)) |
| event := eventSystem.Events[0] |
| assert.Equal(t, si.EventRecord_REQUEST, event.Type) |
| assert.Equal(t, si.EventRecord_NONE, event.EventChangeType) |
| assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail) |
| assert.Equal(t, "app-1", event.ReferenceID) |
| assert.Equal(t, "alloc-0", event.ObjectID) |
| assert.Equal(t, "Request 'alloc-0' exceeds the available user quota (requested map[memory:100 vcores:10], available map[memory:1 vcores:1])", event.Message) |
| |
| // second attempt - no new event |
| app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) |
| assert.Equal(t, 1, len(eventSystem.Events)) |
| |
| // third attempt with enough headroom - new event |
| eventSystem.Reset() |
| conf.Limits[0].MaxResources = nil |
| err = ugm.GetUserManager().UpdateConfig(conf, "root") |
| assert.NilError(t, err) |
| app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) |
| assert.Equal(t, 1, len(eventSystem.Events)) |
| event = eventSystem.Events[0] |
| assert.Equal(t, si.EventRecord_REQUEST, event.Type) |
| assert.Equal(t, si.EventRecord_NONE, event.EventChangeType) |
| assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail) |
| assert.Equal(t, "app-1", event.ReferenceID) |
| assert.Equal(t, "alloc-0", event.ObjectID) |
| assert.Equal(t, "Request 'alloc-0' fits in the available user quota", event.Message) |
| } |
| |
| func TestAllocationFailures(t *testing.T) { |
| setupUGM() |
| |
| res, err := resources.NewResourceFromConf(map[string]string{"memory": "100", "vcores": "10"}) |
| assert.NilError(t, err) |
| headroom, err := resources.NewResourceFromConf(map[string]string{"memory": "0", "vcores": "0"}) |
| assert.NilError(t, err) |
| ask := newAllocationAsk("alloc-0", "app-1", res) |
| app := newApplication(appID1, "default", "root") |
| queue, err := createRootQueue(nil) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| sr := sortedRequests{} |
| sr.insert(ask) |
| app.sortedRequests = sr |
| attempts := 0 |
| |
| // case #1: not enough queue headroom |
| app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) |
| assert.Equal(t, 1, len(ask.allocLog)) |
| assert.Equal(t, int32(1), ask.allocLog[NotEnoughQueueQuota].Count) |
| |
| // case #2: not enough user quota |
| // create config with resource limits for "testuser" |
| conf := configs.QueueConfig{ |
| Name: "root", |
| Parent: true, |
| SubmitACL: "*", |
| Limits: []configs.Limit{ |
| { |
| Limit: "leaf queue limit", |
| Users: []string{ |
| "testuser", |
| }, |
| MaxResources: map[string]string{ |
| "memory": "1", |
| "vcores": "1", |
| }, |
| }, |
| }, |
| } |
| err = ugm.GetUserManager().UpdateConfig(conf, "root") |
| assert.NilError(t, err) |
| headroom, err = resources.NewResourceFromConf(map[string]string{"memory": "1000", "vcores": "1000"}) |
| assert.NilError(t, err) |
| app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) |
| assert.Equal(t, 2, len(ask.allocLog)) |
| assert.Equal(t, int32(1), ask.allocLog[NotEnoughUserQuota].Count) |
| } |
| |
| func TestGetOutstandingRequests(t *testing.T) { |
| // Create a sample Resource and AllocationAsk |
| resMap := map[string]string{"memory": "100", "vcores": "10"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "failed to create resource with error") |
| |
| allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res) |
| allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res) |
| allocationAsk1.SetSchedulingAttempted(true) |
| allocationAsk2.SetSchedulingAttempted(true) |
| |
| // Create an Application instance |
| app := &Application{ |
| ApplicationID: "app-1", |
| queuePath: "default", |
| } |
| |
| app.user = security.UserGroup{ |
| User: "user1", |
| Groups: []string{"group1"}, |
| } |
| |
| // Set up the Application's sortedRequests with AllocationAsks |
| sr := sortedRequests{} |
| sr.insert(allocationAsk1) |
| sr.insert(allocationAsk2) |
| app.sortedRequests = sr |
| |
| // Test Case 1: queueHeadroom meets, but userHeadroom does not |
| queueHeadroom, err := resources.NewResourceFromConf(map[string]string{"memory": "250", "vcores": "25"}) |
| assert.NilError(t, err, "failed to create queue headroom resource with error") |
| userHeadroom, err := resources.NewResourceFromConf(map[string]string{"memory": "50", "vcores": "5"}) |
| assert.NilError(t, err, "failed to create user headroom resource with error") |
| total1 := []*AllocationAsk{} |
| app.getOutstandingRequests(queueHeadroom, userHeadroom, &total1) |
| assert.Equal(t, 0, len(total1), "expected one outstanding request for TestCase 1") |
| |
| // Test Case 2: Both queueHeadroom and userHeadroom meet |
| queueHeadroom2, err := resources.NewResourceFromConf(map[string]string{"memory": "250", "vcores": "25"}) |
| assert.NilError(t, err, "failed to create queue headroom resource with error") |
| userHeadroom2, err := resources.NewResourceFromConf(map[string]string{"memory": "250", "vcores": "25"}) |
| assert.NilError(t, err, "failed to create user headroom resource with error") |
| total2 := []*AllocationAsk{} |
| app.getOutstandingRequests(queueHeadroom2, userHeadroom2, &total2) |
| assert.Equal(t, 2, len(total2), "expected two outstanding requests for TestCase 2") |
| |
| // Test Case 3: queueHeadroom does not meet, but userHeadroom meets |
| queueHeadroom3, err := resources.NewResourceFromConf(map[string]string{"memory": "50", "vcores": "5"}) |
| assert.NilError(t, err, "failed to create queue headroom resource with error") |
| userHeadroom3, err := resources.NewResourceFromConf(map[string]string{"memory": "250", "vcores": "25"}) |
| assert.NilError(t, err, "failed to create user headroom resource with error") |
| total3 := []*AllocationAsk{} |
| app.getOutstandingRequests(queueHeadroom3, userHeadroom3, &total3) |
| assert.Equal(t, 0, len(total3), "expected one outstanding request for TestCase 3") |
| |
| // Test Case 4: Neither queueHeadroom nor userHeadroom meets |
| queueHeadroom4, err := resources.NewResourceFromConf(map[string]string{"memory": "50", "vcores": "5"}) |
| assert.NilError(t, err, "failed to create queue headroom resource with error") |
| userHeadroom4, err := resources.NewResourceFromConf(map[string]string{"memory": "80", "vcores": "8"}) |
| assert.NilError(t, err, "failed to create user headroom resource with error") |
| total4 := []*AllocationAsk{} |
| app.getOutstandingRequests(queueHeadroom4, userHeadroom4, &total4) |
| assert.Equal(t, 0, len(total4), "expected no outstanding requests for TestCase 4") |
| } |
| |
| func TestGetOutstandingRequests_NoSchedulingAttempt(t *testing.T) { |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1}) |
| |
| allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res) |
| allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res) |
| allocationAsk3 := newAllocationAsk("alloc-3", "app-1", res) |
| allocationAsk4 := newAllocationAsk("alloc-4", "app-1", res) |
| allocationAsk2.SetSchedulingAttempted(true) |
| allocationAsk4.SetSchedulingAttempted(true) |
| app := &Application{ |
| ApplicationID: "app-1", |
| queuePath: "default", |
| } |
| sr := sortedRequests{} |
| sr.insert(allocationAsk1) |
| sr.insert(allocationAsk2) |
| sr.insert(allocationAsk3) |
| sr.insert(allocationAsk4) |
| app.sortedRequests = sr |
| |
| var total []*AllocationAsk |
| headroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10}) |
| userHeadroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8}) |
| app.getOutstandingRequests(headroom, userHeadroom, &total) |
| |
| assert.Equal(t, 2, len(total)) |
| assert.Equal(t, "alloc-2", total[0].allocationKey) |
| assert.Equal(t, "alloc-4", total[1].allocationKey) |
| } |
| |
| func TestGetOutstandingRequests_RequestTriggeredPreemptionHasRequiredNode(t *testing.T) { |
| // Test that we decrease headrooms even if the requests have triggered upscaling or |
| // the ask is a DaemonSet pod (requiredNode != "") |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1}) |
| |
| allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res) |
| allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res) |
| allocationAsk3 := newAllocationAsk("alloc-3", "app-1", res) |
| allocationAsk4 := newAllocationAsk("alloc-4", "app-1", res) |
| allocationAsk1.SetSchedulingAttempted(true) |
| allocationAsk2.SetSchedulingAttempted(true) |
| allocationAsk3.SetSchedulingAttempted(true) |
| allocationAsk4.SetSchedulingAttempted(true) // hasn't triggered scaling, no required node --> picked |
| allocationAsk1.SetScaleUpTriggered(true) // triggered scaling, no required node --> not selected |
| allocationAsk2.SetScaleUpTriggered(true) // triggered scaling, has required node --> not selected |
| allocationAsk2.SetRequiredNode("node-1") |
| allocationAsk3.SetRequiredNode("node-1") // hasn't triggered scaling, has required node --> not selected |
| |
| app := &Application{ |
| ApplicationID: "app-1", |
| queuePath: "default", |
| } |
| sr := sortedRequests{} |
| sr.insert(allocationAsk1) |
| sr.insert(allocationAsk2) |
| sr.insert(allocationAsk3) |
| sr.insert(allocationAsk4) |
| app.sortedRequests = sr |
| |
| var total []*AllocationAsk |
| headroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10}) |
| userHeadroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8}) |
| app.getOutstandingRequests(headroom, userHeadroom, &total) |
| |
| assert.Equal(t, 1, len(total)) |
| assert.Equal(t, "alloc-4", total[0].allocationKey) |
| } |
| |
| func TestGetOutstandingRequests_AskReplaceable(t *testing.T) { |
| res := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1}) |
| |
| allocationAsk1 := newAllocationAsk("alloc-1", "app-1", res) // replaceable |
| allocationAsk2 := newAllocationAsk("alloc-2", "app-1", res) // replaceable |
| allocationAsk3 := newAllocationAsk("alloc-3", "app-1", res) // non-replaceable |
| allocationAsk1.SetSchedulingAttempted(true) |
| allocationAsk2.SetSchedulingAttempted(true) |
| allocationAsk3.SetSchedulingAttempted(true) |
| allocationAsk1.taskGroupName = "testgroup" |
| allocationAsk2.taskGroupName = "testgroup" |
| |
| app := &Application{ |
| ApplicationID: "app-1", |
| queuePath: "default", |
| } |
| sr := sortedRequests{} |
| sr.insert(allocationAsk1) |
| sr.insert(allocationAsk2) |
| sr.insert(allocationAsk3) |
| app.sortedRequests = sr |
| app.addPlaceholderDataWithLocking(allocationAsk1) |
| app.addPlaceholderDataWithLocking(allocationAsk2) |
| |
| var total []*AllocationAsk |
| headroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10}) |
| userHeadroom := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8}) |
| app.getOutstandingRequests(headroom, userHeadroom, &total) |
| |
| assert.Equal(t, 1, len(total)) |
| assert.Equal(t, "alloc-3", total[0].allocationKey) |
| } |
| |
| func TestGetRateLimitedAppLog(t *testing.T) { |
| l := getRateLimitedAppLog() |
| assert.Check(t, l != nil) |
| } |
| |
| func TestTryAllocateWithReservedHeadRoomChecking(t *testing.T) { |
| defer func() { |
| if r := recover(); r != nil { |
| t.Fatalf("reserved headroom test regression: %v", r) |
| } |
| }() |
| |
| res, err := resources.NewResourceFromConf(map[string]string{"memory": "2G"}) |
| assert.NilError(t, err, "failed to create basic resource") |
| var headRoom *resources.Resource |
| headRoom, err = resources.NewResourceFromConf(map[string]string{"memory": "1G"}) |
| assert.NilError(t, err, "failed to create basic resource") |
| |
| app := newApplication(appID1, "default", "root") |
| ask := newAllocationAsk(aKey, appID1, res) |
| var queue *Queue |
| queue, err = createRootQueue(map[string]string{"memory": "1G"}) |
| assert.NilError(t, err, "queue create failed") |
| app.queue = queue |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should have been added to app") |
| |
| node1 := newNodeRes(nodeID1, res) |
| node2 := newNodeRes(nodeID2, res) |
| // reserve that works |
| err = app.Reserve(node1, ask) |
| assert.NilError(t, err, "reservation should not have failed") |
| |
| iter := getNodeIteratorFn(node1, node2) |
| alloc := app.tryReservedAllocate(headRoom, iter) |
| assert.Assert(t, alloc == nil, "Alloc is expected to be nil due to insufficient headroom") |
| } |
| |
| func TestUpdateRunnableStatus(t *testing.T) { |
| app := newApplication(appID0, "default", "root.unknown") |
| assert.Assert(t, app.runnableInQueue) |
| assert.Assert(t, app.runnableByUserLimit) |
| eventSystem := mock.NewEventSystem() |
| app.appEvents = newApplicationEvents(app, eventSystem) |
| |
| // App runnable - no events |
| app.updateRunnableStatus(true, true) |
| assert.Equal(t, 0, len(eventSystem.Events)) |
| |
| // App not runnable in queue |
| eventSystem.Reset() |
| app.updateRunnableStatus(false, true) |
| assert.Equal(t, 1, len(eventSystem.Events)) |
| assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUEUE, eventSystem.Events[0].EventChangeDetail) |
| // Try again - no new events |
| app.updateRunnableStatus(false, true) |
| assert.Equal(t, 1, len(eventSystem.Events)) |
| |
| // App becomes runnable in queue |
| eventSystem.Reset() |
| app.updateRunnableStatus(true, true) |
| assert.Equal(t, 1, len(eventSystem.Events)) |
| assert.Equal(t, si.EventRecord_APP_RUNNABLE_QUEUE, eventSystem.Events[0].EventChangeDetail) |
| |
| // Try again - no new events |
| app.updateRunnableStatus(true, true) |
| assert.Equal(t, 1, len(eventSystem.Events)) |
| |
| // App not runnable by UG quota |
| eventSystem.Reset() |
| app.updateRunnableStatus(true, false) |
| assert.Equal(t, 1, len(eventSystem.Events)) |
| assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUOTA, eventSystem.Events[0].EventChangeDetail) |
| // Try again - no new events |
| app.updateRunnableStatus(true, false) |
| assert.Equal(t, 1, len(eventSystem.Events)) |
| |
| // App becomes runnable by user quota |
| eventSystem.Reset() |
| app.updateRunnableStatus(true, true) |
| assert.Equal(t, si.EventRecord_APP_RUNNABLE_QUOTA, eventSystem.Events[0].EventChangeDetail) |
| // Try again - no new events |
| app.updateRunnableStatus(true, true) |
| assert.Equal(t, 1, len(eventSystem.Events)) |
| |
| // Both false |
| eventSystem.Reset() |
| app.updateRunnableStatus(false, false) |
| assert.Equal(t, 2, len(eventSystem.Events)) |
| assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUEUE, eventSystem.Events[0].EventChangeDetail) |
| assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUOTA, eventSystem.Events[1].EventChangeDetail) |
| } |
| |
| func TestGetMaxResourceFromTag(t *testing.T) { |
| app := newApplication(appID0, "default", "root.unknown") |
| testGetResourceFromTag(t, siCommon.AppTagNamespaceResourceQuota, app.tags, app.GetMaxResource) |
| } |
| |
| func TestGuaranteedResourceFromTag(t *testing.T) { |
| app := newApplication(appID0, "default", "root.unknown") |
| testGetResourceFromTag(t, siCommon.AppTagNamespaceResourceGuaranteed, app.tags, app.GetGuaranteedResource) |
| } |
| |
| func testGetResourceFromTag(t *testing.T, tagName string, tags map[string]string, getResource func() *resources.Resource) { |
| assert.Equal(t, 0, len(tags), "tags are not empty") |
| |
| // no value for tag |
| res := getResource() |
| assert.Assert(t, res == nil, "unexpected resource") |
| |
| // empty value |
| tags[tagName] = "" |
| res = getResource() |
| assert.Assert(t, res == nil, "unexpected resource") |
| |
| // valid value |
| tags[tagName] = "{\"resources\":{\"vcore\":{\"value\":111}}}" |
| res = getResource() |
| assert.Assert(t, res != nil) |
| assert.Equal(t, 1, len(res.Resources)) |
| assert.Equal(t, resources.Quantity(111), res.Resources["vcore"]) |
| |
| // zero |
| tags[tagName] = "{\"resources\":{\"vcore\":{\"value\":0}}}" |
| res = getResource() |
| assert.Assert(t, res == nil) |
| |
| // negative |
| tags[tagName] = "{\"resources\":{\"vcore\":{\"value\":-12}}}" |
| res = getResource() |
| assert.Assert(t, res == nil, "unexpected resource") |
| |
| // invalid value |
| tags[tagName] = "{xyz}" |
| res = getResource() |
| assert.Assert(t, res == nil, "unexpected resource") |
| } |
| |
| func (sa *Application) addPlaceholderDataWithLocking(ask *AllocationAsk) { |
| sa.Lock() |
| defer sa.Unlock() |
| sa.addPlaceholderData(ask) |
| } |
| |
| func (sa *Application) getPlaceholderTimer() *time.Timer { |
| sa.RLock() |
| defer sa.RUnlock() |
| return sa.placeholderTimer |
| } |
| |
| func (sa *Application) handleApplicationEventWithLocking(event applicationEvent) error { |
| sa.Lock() |
| defer sa.Unlock() |
| return sa.HandleApplicationEvent(event) |
| } |
| |
| func (sa *Application) handleApplicationEventWithInfoLocking(event applicationEvent, info string) error { |
| sa.Lock() |
| defer sa.Unlock() |
| return sa.HandleApplicationEventWithInfo(event, info) |
| } |
| |
| func (sa *Application) disableStateChangeEvents() { |
| sa.Lock() |
| defer sa.Unlock() |
| sa.sendStateChangeEvents = false |
| } |
| |
| func (sa *Application) resetAppEvents() { |
| sa.Lock() |
| defer sa.Unlock() |
| sa.appEvents = newApplicationEvents(sa, events.GetEventSystem()) |
| } |