| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package scheduler |
| |
| import ( |
| "fmt" |
| "strconv" |
| "testing" |
| "time" |
| |
| "gotest.tools/v3/assert" |
| |
| "github.com/apache/yunikorn-core/pkg/common" |
| "github.com/apache/yunikorn-core/pkg/common/configs" |
| "github.com/apache/yunikorn-core/pkg/common/resources" |
| "github.com/apache/yunikorn-core/pkg/common/security" |
| "github.com/apache/yunikorn-core/pkg/events" |
| "github.com/apache/yunikorn-core/pkg/mock" |
| "github.com/apache/yunikorn-core/pkg/plugins" |
| "github.com/apache/yunikorn-core/pkg/rmproxy/rmevent" |
| "github.com/apache/yunikorn-core/pkg/scheduler/objects" |
| "github.com/apache/yunikorn-core/pkg/scheduler/policies" |
| "github.com/apache/yunikorn-core/pkg/scheduler/ugm" |
| siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" |
| "github.com/apache/yunikorn-scheduler-interface/lib/go/si" |
| ) |
| |
| func setupUGM() { |
| userManager := ugm.GetUserManager() |
| userManager.ClearUserTrackers() |
| userManager.ClearGroupTrackers() |
| userManager.ClearConfigLimits() |
| } |
| |
| func setupNode(t *testing.T, nodeID string, partition *PartitionContext, nodeRes *resources.Resource) *objects.Node { |
| err := partition.AddNode(newNodeMaxResource(nodeID, nodeRes), nil) |
| assert.NilError(t, err, "test "+nodeID+" add failed unexpected") |
| node := partition.GetNode(nodeID) |
| if node == nil { |
| t.Fatal("new node was not found on the partition") |
| } |
| return node |
| } |
| |
| func TestNewPartition(t *testing.T) { |
| partition, err := newPartitionContext(configs.PartitionConfig{}, "", nil) |
| if err == nil || partition != nil { |
| t.Fatal("nil inputs should not have returned partition") |
| } |
| conf := configs.PartitionConfig{Name: "test"} |
| partition, err = newPartitionContext(conf, "", nil) |
| if err == nil || partition != nil { |
| t.Fatal("named partition without RM should not have returned partition") |
| } |
| partition, err = newPartitionContext(conf, "test", &ClusterContext{}) |
| if err == nil || partition != nil { |
| t.Fatal("partition without root queue should not have returned partition") |
| } |
| |
| conf = configs.PartitionConfig{ |
| Name: "test", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "test", |
| Parent: true, |
| }, |
| }, |
| } |
| partition, err = newPartitionContext(conf, "test", &ClusterContext{}) |
| if err == nil || partition != nil { |
| t.Fatal("partition without root queue should not have returned partition") |
| } |
| |
| conf = configs.PartitionConfig{ |
| Name: "test", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "root", |
| Parent: true, |
| SubmitACL: "*", |
| Queues: nil, |
| Limits: []configs.Limit{ |
| { |
| Limit: "root queue limit", |
| Users: []string{ |
| "user1", |
| }, |
| Groups: []string{ |
| "group1", |
| }, |
| MaxResources: map[string]string{ |
| "memory": "10", |
| "vcores": "10", |
| }, |
| MaxApplications: 2, |
| }, |
| }, |
| }, |
| }, |
| } |
| partition, err = newPartitionContext(conf, "test", &ClusterContext{}) |
| assert.NilError(t, err, "partition create should not have failed with error") |
| if partition.root.QueuePath != "root" { |
| t.Fatal("partition root queue not set as expected") |
| } |
| } |
| |
| func TestNewWithPlacement(t *testing.T) { |
| confWith := configs.PartitionConfig{ |
| Name: "test", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "root", |
| Parent: true, |
| SubmitACL: "*", |
| Queues: nil, |
| }, |
| }, |
| PlacementRules: []configs.PlacementRule{ |
| { |
| Name: "provided", |
| Create: true, |
| }, |
| }, |
| Limits: nil, |
| NodeSortPolicy: configs.NodeSortingPolicy{}, |
| } |
| partition, err := newPartitionContext(confWith, rmID, nil) |
| assert.NilError(t, err, "test partition create failed with error") |
| |
| // add a rule and check if it is updated |
| confWith = configs.PartitionConfig{ |
| Name: "test", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "root", |
| Parent: true, |
| SubmitACL: "*", |
| Queues: nil, |
| }, |
| }, |
| PlacementRules: []configs.PlacementRule{ |
| { |
| Name: "provided", |
| Create: false, |
| }, |
| { |
| Name: "user", |
| Create: true, |
| }, |
| }, |
| } |
| err = partition.updatePartitionDetails(confWith) |
| assert.NilError(t, err, "update partition failed unexpected with error") |
| |
| // update to turn off placement manager |
| conf := configs.PartitionConfig{ |
| Name: "test", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "root", |
| Parent: true, |
| SubmitACL: "*", |
| Queues: nil, |
| }, |
| }, |
| } |
| err = partition.updatePartitionDetails(conf) |
| assert.NilError(t, err, "update partition failed unexpected with error") |
| |
| // set the old config back this should turn on the placement again |
| err = partition.updatePartitionDetails(confWith) |
| assert.NilError(t, err, "update partition failed unexpected with error") |
| } |
| |
| func TestAddNode(t *testing.T) { |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "test partition create failed with error") |
| err = partition.AddNode(nil, nil) |
| if err == nil { |
| t.Fatal("nil node add did not return error") |
| } |
| assert.Equal(t, 0, partition.nodes.GetNodeCount(), "nil node should not be added") |
| node := newNodeMaxResource("test1", resources.NewResource()) |
| // stop the partition node should be rejected |
| partition.markPartitionForRemoval() |
| assert.Assert(t, partition.isDraining(), "partition should have been marked as draining") |
| err = partition.AddNode(node, nil) |
| if err == nil { |
| t.Error("test node add to draining partition should have failed") |
| } |
| assert.Equal(t, partition.nodes.GetNodeCount(), 0, "node list not correct") |
| |
| // reset the state (hard no checks) |
| partition.stateMachine.SetState(objects.Active.String()) |
| err = partition.AddNode(node, nil) |
| assert.NilError(t, err, "test node add failed unexpected") |
| assert.Equal(t, partition.nodes.GetNodeCount(), 1, "node list not correct") |
| // add the same node nothing changes |
| err = partition.AddNode(node, nil) |
| if err == nil { |
| t.Fatal("add same test node worked unexpected") |
| } |
| assert.Equal(t, partition.nodes.GetNodeCount(), 1, "node list not correct") |
| err = partition.AddNode(newNodeMaxResource("test2", resources.NewResource()), nil) |
| assert.NilError(t, err, "test node2 add failed unexpected") |
| assert.Equal(t, partition.nodes.GetNodeCount(), 2, "node list not correct") |
| } |
| |
| func TestAddNodeWithAllocations(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| q := partition.GetQueue(defQueue) |
| if q == nil { |
| t.Fatal("expected default queue not found") |
| } |
| |
| // add a new app |
| app := newApplication(appID1, "default", defQueue) |
| appRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000}) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| |
| // add a node with allocations |
| nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}) |
| node := newNodeMaxResource(nodeID1, nodeRes) |
| |
| // fail with an unknown app |
| ask := newAllocationAsk("alloc-1", "unknown", appRes) |
| alloc := objects.NewAllocation(nodeID1, ask) |
| allocs := []*objects.Allocation{alloc} |
| err = partition.AddNode(node, allocs) |
| if err == nil { |
| t.Errorf("add node to partition should have failed (app missing)") |
| } |
| assert.Equal(t, partition.nodes.GetNodeCount(), 0, "error returned but node still added to the partition (app)") |
| |
| // fail with a broken alloc |
| ask = newAllocationAsk("alloc-1", appID1, appRes) |
| alloc = objects.NewAllocation(nodeID1, ask) |
| allocs = []*objects.Allocation{alloc} |
| // add a node this must work |
| err = partition.AddNode(node, allocs) |
| // check the partition |
| assert.NilError(t, err, "add node to partition should not have failed") |
| assert.Equal(t, partition.nodes.GetNodeCount(), 1, "no error returned but node not added to the partition") |
| assert.Assert(t, resources.Equals(nodeRes, partition.GetTotalPartitionResource()), "add node to partition did not update total resources expected %v got %d", nodeRes, partition.GetTotalPartitionResource()) |
| assert.Equal(t, partition.GetTotalAllocationCount(), 1, "add node to partition did not add allocation") |
| |
| // check the queue usage |
| assert.Assert(t, resources.Equals(q.GetAllocatedResource(), appRes), "add node to partition did not update queue as expected") |
| assertLimits(t, getTestUserGroup(), appRes) |
| } |
| |
| func TestRemoveNode(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "test partition create failed with error") |
| err = partition.AddNode(newNodeMaxResource("test", resources.NewResource()), nil) |
| assert.NilError(t, err, "test node add failed unexpected") |
| assert.Equal(t, 1, partition.nodes.GetNodeCount(), "node list not correct") |
| |
| // remove non existing node |
| _, _ = partition.removeNode("") |
| assert.Equal(t, 1, partition.nodes.GetNodeCount(), "nil node should not remove anything") |
| _, _ = partition.removeNode("does not exist") |
| assert.Equal(t, 1, partition.nodes.GetNodeCount(), "non existing node was removed") |
| |
| _, _ = partition.removeNode("test") |
| assert.Equal(t, 0, partition.nodes.GetNodeCount(), "node was not removed") |
| } |
| |
| func TestRemoveNodeWithAllocations(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| // add a new app |
| app := newApplication(appID1, "default", defQueue) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| |
| // add a node with allocations: must have the correct app added already |
| nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000000}) |
| node := newNodeMaxResource(nodeID1, nodeRes) |
| appRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000}) |
| ask := newAllocationAsk("alloc-1", appID1, appRes) |
| alloc := objects.NewAllocation(nodeID1, ask) |
| allocAllocationKey := alloc.GetAllocationKey() |
| allocs := []*objects.Allocation{alloc} |
| err = partition.AddNode(node, allocs) |
| assert.NilError(t, err, "add node to partition should not have failed") |
| // get what was allocated |
| allocated := node.GetAllAllocations() |
| assert.Equal(t, 1, len(allocated), "allocation not added correctly") |
| assertLimits(t, getTestUserGroup(), appRes) |
| |
| // add broken allocations |
| ask = newAllocationAsk("alloc-na", "not-an-app", appRes) |
| alloc = objects.NewAllocation(nodeID1, ask) |
| node.AddAllocation(alloc) |
| ask = newAllocationAsk("alloc-2", appID1, appRes) |
| alloc = objects.NewAllocation(nodeID1, ask) |
| node.AddAllocation(alloc) |
| assertLimits(t, getTestUserGroup(), appRes) |
| |
| // remove the node this cannot fail |
| released, confirmed := partition.removeNode(nodeID1) |
| assert.Equal(t, 0, partition.GetTotalNodeCount(), "node list was not updated, node was not removed") |
| assert.Equal(t, 1, len(released), "node did not release correct allocation") |
| assert.Equal(t, 0, len(confirmed), "node did not confirm correct allocation") |
| assert.Equal(t, released[0].GetAllocationKey(), allocAllocationKey, "allocationKey returned by release not the same as on allocation") |
| assertLimits(t, getTestUserGroup(), resources.Zero) |
| |
| assert.NilError(t, err, "the event should have been processed") |
| } |
| |
| // test with a replacement of a placeholder: placeholder and real on the same node that gets removed |
| func TestRemoveNodeWithPlaceholders(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| // add a new app |
| app := newApplication(appID1, "default", defQueue) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| |
| // add a node with allocation: must have the correct app added already |
| nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) |
| node1 := newNodeMaxResource(nodeID1, nodeRes) |
| appRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) |
| ask := newAllocationAskTG("placeholder", appID1, taskGroup, appRes, true) |
| ph := objects.NewAllocation(nodeID1, ask) |
| allocs := []*objects.Allocation{ph} |
| err = partition.AddNode(node1, allocs) |
| assert.NilError(t, err, "add node1 to partition should not have failed") |
| // get what was allocated |
| allocated := node1.GetAllAllocations() |
| assert.Equal(t, 1, len(allocated), "allocation not added correctly to node1 expected 1 got: %v", allocated) |
| assert.Assert(t, resources.Equals(node1.GetAllocatedResource(), appRes), "allocation not added correctly to node1") |
| assertLimits(t, getTestUserGroup(), appRes) |
| assert.Equal(t, 1, partition.getPhAllocationCount(), "number of active placeholders") |
| |
| // fake an ask that is used |
| ask = newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, false) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should be added to app") |
| _, err = app.AllocateAsk(allocKey) |
| assert.NilError(t, err, "ask should have been updated without error") |
| assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app should not have pending resources") |
| assertLimits(t, getTestUserGroup(), appRes) |
| |
| // add real allocation that is replacing the placeholder |
| alloc := objects.NewAllocation(nodeID1, ask) |
| alloc.SetRelease(ph) |
| // double link as if the replacement is ongoing |
| ph.SetRelease(alloc) |
| alloc.SetResult(objects.Replaced) |
| |
| allocs = app.GetAllAllocations() |
| assert.Equal(t, len(allocs), 1, "expected one allocation for the app (placeholder)") |
| assertLimits(t, getTestUserGroup(), appRes) |
| |
| // remove the node that has both placeholder and real allocation |
| released, confirmed := partition.removeNode(nodeID1) |
| assert.Equal(t, 0, partition.GetTotalNodeCount(), "node list was not updated, node was not removed") |
| assert.Equal(t, 1, len(released), "node removal did not release correct allocation") |
| assert.Equal(t, 0, len(confirmed), "node removal should not have confirmed allocation") |
| assert.Equal(t, ph.GetAllocationKey(), released[0].GetAllocationKey(), "allocationKey returned by release not the same as the placeholder") |
| assert.Equal(t, 0, partition.getPhAllocationCount(), "number of active placeholders") |
| allocs = app.GetAllAllocations() |
| assert.Equal(t, 0, len(allocs), "expected no allocations for the app") |
| assert.Assert(t, resources.Equals(app.GetPendingResource(), appRes), "app should have updated pending resources") |
| // check the interim state of the placeholder involved |
| assert.Equal(t, 0, ph.GetReleaseCount(), "placeholder should have no releases linked anymore") |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0})) |
| } |
| |
| func TestCalculateNodesResourceUsage(t *testing.T) { |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| oldCapacity := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100}) |
| node := newNodeMaxResource(nodeID1, oldCapacity) |
| err = partition.AddNode(node, nil) |
| assert.NilError(t, err) |
| |
| occupiedResources := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}) |
| alloc := objects.NewAllocation(nodeID1, newAllocationAsk("key", "appID", occupiedResources)) |
| node.AddAllocation(alloc) |
| usageMap := partition.calculateNodesResourceUsage() |
| assert.Equal(t, node.GetAvailableResource().Resources["first"], resources.Quantity(50)) |
| assert.Equal(t, usageMap["first"][4], 1) |
| |
| occupiedResources = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}) |
| alloc = objects.NewAllocation(nodeID1, newAllocationAsk("key", "appID", occupiedResources)) |
| node.AddAllocation(alloc) |
| usageMap = partition.calculateNodesResourceUsage() |
| assert.Equal(t, node.GetAvailableResource().Resources["first"], resources.Quantity(0)) |
| assert.Equal(t, usageMap["first"][9], 1) |
| |
| newCapacity := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 80}) |
| node.SetCapacity(newCapacity) |
| usageMap = partition.calculateNodesResourceUsage() |
| assert.Assert(t, node.GetAvailableResource().HasNegativeValue()) |
| assert.Equal(t, usageMap["first"][9], 1) |
| } |
| |
| // test basic placeholder preemption |
| // setup: |
| // queue quota max size: 16GB / 16cpu |
| // nodes: 2 * 8GB / 8 cpu |
| // create an application with allocation: 4 GB / 4 cpu |
| // create a gang application requesting: 7 * 2GB / 2cpu |
| // create a daemon set pod for one of the nodes asking for 2GB / 2 cpu |
| // |
| // ensure placeholder has been preempted and released resources has been given to the request asked for |
| // ensure preempted placeholder has been accounted under timed out in gang app placeholder data |
| func TestPlaceholderDataWithPlaceholderPreemption(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| // add a new app1 |
| app1, _ := newApplicationWithHandler(appID1, "default", defQueue) |
| err = partition.AddApplication(app1) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| |
| // add a node with allocation: must have the correct app1 added already |
| resMap := map[string]string{"mem": "2M", "vcore": "2"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "Unexpected error when creating resource from map") |
| appRes := res.Clone() |
| phRes := res.Clone() |
| newRes := res.Clone() |
| appRes.MultiplyTo(2) |
| newRes.MultiplyTo(4) |
| phRes.MultiplyTo(7) |
| |
| ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false) |
| alloc := objects.NewAllocation(nodeID1, ask) |
| allocs := []*objects.Allocation{alloc} |
| |
| node1 := newNodeMaxResource(nodeID1, newRes) |
| err = partition.AddNode(node1, allocs) |
| assert.NilError(t, err, "add node1 to partition should not have failed") |
| assert.Equal(t, 1, partition.nodes.GetNodeCount(), "node list not correct") |
| |
| // get what was allocated |
| allocated := node1.GetAllAllocations() |
| assert.Equal(t, 1, len(allocated), "allocation not added correctly to node1") |
| assert.Assert(t, resources.Equals(node1.GetAllocatedResource(), appRes), "allocation not added correctly to node1") |
| |
| node2 := newNodeMaxResource(nodeID2, newRes) |
| err = partition.AddNode(node2, nil) |
| assert.NilError(t, err, "test node add failed unexpected") |
| assert.Equal(t, 2, partition.nodes.GetNodeCount(), "node list not correct") |
| assert.Assert(t, resources.Equals(partition.GetQueue(defQueue).GetAllocatedResource(), appRes), "Queue allocated resource is not correct") |
| |
| // add the app1 with 6 placeholder request |
| gangApp := newApplicationTGTagsWithPhTimeout(appID2, "default", defQueue, phRes, nil, 0) |
| err = partition.AddApplication(gangApp) |
| assert.NilError(t, err, "app1-1 should have been added to the partition") |
| |
| var lastPh string |
| for i := 1; i <= 6; i++ { |
| // add an ask for a placeholder and allocate |
| lastPh = phID + strconv.Itoa(i) |
| ask = newAllocationAskTG(lastPh, appID2, taskGroup, res, true) |
| err = gangApp.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add placeholder ask %s to app1", lastPh) |
| // try to allocate a placeholder via normal allocate |
| ph := partition.tryAllocate() |
| assert.Assert(t, ph != nil, "expected placeholder to be allocated") |
| } |
| assert.Equal(t, 7, partition.GetTotalAllocationCount(), "placeholder allocation should be counted as normal allocations on the partition") |
| assert.Equal(t, 6, partition.getPhAllocationCount(), "placeholder allocations should be counted as placeholders on the partition") |
| |
| assertPlaceholderData(t, gangApp, 6, 0) |
| partition.removeApplication(appID1) |
| assert.Equal(t, 6, partition.GetTotalAllocationCount(), "remove app did not remove allocation from count") |
| assert.Equal(t, 6, partition.getPhAllocationCount(), "placeholder allocations changed unexpectedly") |
| |
| // add a new app1 |
| app2, testHandler2 := newApplicationWithHandler(appID3, "default", defQueue) |
| err = partition.AddApplication(app2) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| |
| // required node set on ask |
| ask2 := newAllocationAsk(allocKey2, appID3, res) |
| ask2.SetRequiredNode(nodeID2) |
| err = app2.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app1-1") |
| |
| // since node-2 available resource is less than needed, reserve the node |
| alloc = partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("allocation attempt should not have returned an allocation") |
| } |
| // check if updated (must be after allocate call) |
| assert.Equal(t, 1, len(app2.GetReservations()), "app reservation should have been updated") |
| assert.Equal(t, 1, len(app2.GetAskReservations(allocKey2)), "ask should have been reserved") |
| |
| // try through reserved scheduling cycle this should trigger preemption |
| alloc = partition.tryReservedAllocate() |
| if alloc != nil { |
| t.Fatal("reserved allocation attempt should not have returned an allocation") |
| } |
| |
| // check if there is a release event for the expected allocation |
| var found bool |
| var releasedAllocationKey string |
| for _, event := range testHandler2.GetEvents() { |
| if allocRelease, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { |
| found = allocRelease.ReleasedAllocations[0].AllocationKey == lastPh |
| releasedAllocationKey = allocRelease.ReleasedAllocations[0].AllocationKey |
| break |
| } |
| } |
| assert.Assert(t, found, "release allocation event not found in list") |
| // release allocation: do what the context would do after the shim processing |
| release := &si.AllocationRelease{ |
| PartitionName: partition.Name, |
| ApplicationID: appID2, |
| AllocationKey: releasedAllocationKey, |
| TerminationType: si.TerminationType_PREEMPTED_BY_SCHEDULER, |
| } |
| releases, confirmed := partition.removeAllocation(release) |
| assert.Equal(t, 0, len(releases), "not expecting any released allocations") |
| assert.Assert(t, confirmed == nil, "not expecting any confirmed allocations") |
| assert.Equal(t, 5, partition.GetTotalAllocationCount(), "preempted placeholder should be removed from allocations") |
| assert.Equal(t, 5, partition.getPhAllocationCount(), "preempted placeholder should be removed") |
| assertPlaceholderData(t, gangApp, 6, 1) |
| } |
| |
| // test node removal effect on placeholder data |
| // setup: |
| // queue quota max size: 16GB / 16cpu |
| // nodes: 2 * 8GB / 8 cpu |
| // create an application with allocation: 4 GB / 4 cpu |
| // create an gang application requesting: 7 * 2GB / 2cpu |
| // Remove the node where placeholders are running |
| // |
| // ensure removed placeholders has been accounted under timed out in gang app placeholder data |
| func TestPlaceholderDataWithNodeRemoval(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| // add a new app1 |
| app1, _ := newApplicationWithHandler(appID1, "default", defQueue) |
| err = partition.AddApplication(app1) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| |
| resMap := map[string]string{"mem": "2M", "vcore": "2"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "Unexpected error when creating resource from map") |
| appRes := res.Clone() |
| phRes := res.Clone() |
| newRes := res.Clone() |
| appRes.MultiplyTo(2) |
| newRes.MultiplyTo(4) |
| phRes.MultiplyTo(7) |
| |
| // add a node with allocation: must have the correct app1 added already |
| ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false) |
| alloc := objects.NewAllocation(nodeID1, ask) |
| allocs := []*objects.Allocation{alloc} |
| |
| node1 := newNodeMaxResource(nodeID1, newRes) |
| err = partition.AddNode(node1, allocs) |
| assert.NilError(t, err, "add node1 to partition should not have failed") |
| assert.Equal(t, 1, partition.nodes.GetNodeCount(), "node list not correct") |
| |
| // get what was allocated |
| allocated := node1.GetAllAllocations() |
| assert.Equal(t, 1, len(allocated), "allocation not added correctly to node1") |
| assert.Assert(t, resources.Equals(node1.GetAllocatedResource(), appRes), "allocation not added correctly to node1") |
| |
| node2 := newNodeMaxResource(nodeID2, newRes) |
| err = partition.AddNode(node2, nil) |
| assert.NilError(t, err, "test node add failed unexpected") |
| assert.Equal(t, 2, partition.nodes.GetNodeCount(), "node list not correct") |
| assert.Assert(t, resources.Equals(partition.GetQueue(defQueue).GetAllocatedResource(), appRes), "Queue allocated resource is not correct") |
| |
| // add the app1 with 6 placeholder request |
| gangApp := newApplicationTGTagsWithPhTimeout(appID2, "default", defQueue, phRes, nil, 0) |
| err = partition.AddApplication(gangApp) |
| assert.NilError(t, err, "app1-1 should have been added to the partition") |
| |
| for i := 1; i <= 6; i++ { |
| // add an ask for a placeholder and allocate |
| ask = newAllocationAskTG(phID+strconv.Itoa(i+1), appID2, taskGroup, res, true) |
| err = gangApp.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add placeholder ask ph-1 to app1") |
| // try to allocate a placeholder via normal allocate |
| ph := partition.tryAllocate() |
| if ph == nil { |
| t.Fatal("expected placeholder to be allocated") |
| } |
| } |
| |
| // add an ask for a last placeholder and allocate |
| lastPh := phID + strconv.Itoa(7) |
| ask = newAllocationAskTG(lastPh, appID2, taskGroup, res, true) |
| err = gangApp.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add placeholder ask ph-1 to app1") |
| |
| // try to allocate a last placeholder via normal allocate |
| partition.tryAllocate() |
| |
| assertPlaceholderData(t, gangApp, 7, 0) |
| |
| // Remove node |
| partition.removeNode(nodeID2) |
| assertPlaceholderData(t, gangApp, 7, 4) |
| } |
| |
| // Test removal of placeholder has been accounted as timed out in app placeholder data |
| // setup: |
| // queue quota max size: 16GB / 16cpu |
| // nodes: 2 * 8GB / 8 cpu |
| // create an application with allocation: 4 GB / 4 cpu |
| // create an gang application requesting: 7 * 2GB / 2cpu |
| // Remove the node where placeholders are running |
| // |
| // ensure removed placeholders has been accounted under timed out in gang app placeholder data |
| func TestPlaceholderDataWithRemoval(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| // add a new app1 |
| app1, _ := newApplicationWithHandler(appID1, "default", defQueue) |
| err = partition.AddApplication(app1) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| |
| resMap := map[string]string{"mem": "2M", "vcore": "2"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "Unexpected error when creating resource from map") |
| appRes := res.Clone() |
| phRes := res.Clone() |
| newRes := res.Clone() |
| appRes.MultiplyTo(2) |
| newRes.MultiplyTo(4) |
| phRes.MultiplyTo(7) |
| |
| // add a node with allocation: must have the correct app1 added already |
| ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false) |
| alloc := objects.NewAllocation(nodeID1, ask) |
| allocs := []*objects.Allocation{alloc} |
| |
| node1 := newNodeMaxResource(nodeID1, newRes) |
| err = partition.AddNode(node1, allocs) |
| assert.NilError(t, err, "add node1 to partition should not have failed") |
| assert.Equal(t, 1, partition.nodes.GetNodeCount(), "node list not correct") |
| |
| // get what was allocated |
| allocated := node1.GetAllAllocations() |
| assert.Equal(t, 1, len(allocated), "allocation not added correctly to node1") |
| assert.Assert(t, resources.Equals(node1.GetAllocatedResource(), appRes), "allocation not added correctly to node1") |
| |
| node2 := newNodeMaxResource(nodeID2, newRes) |
| err = partition.AddNode(node2, nil) |
| assert.NilError(t, err, "test node add failed unexpected") |
| assert.Equal(t, 2, partition.nodes.GetNodeCount(), "node list not correct") |
| assert.Assert(t, resources.Equals(partition.GetQueue(defQueue).GetAllocatedResource(), appRes), "Queue allocated resource is not correct") |
| |
| // add the app1 with 6 placeholder request |
| gangApp := newApplicationTGTagsWithPhTimeout(appID2, "default", defQueue, phRes, nil, 0) |
| err = partition.AddApplication(gangApp) |
| assert.NilError(t, err, "app1-1 should have been added to the partition") |
| |
| var lastPhAllocationKey string |
| for i := 1; i <= 6; i++ { |
| // add an ask for a placeholder and allocate |
| ask = newAllocationAskTG(phID+strconv.Itoa(i+1), appID2, taskGroup, res, true) |
| err = gangApp.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add placeholder ask ph-1 to app1") |
| // try to allocate a placeholder via normal allocate |
| ph := partition.tryAllocate() |
| if ph == nil { |
| t.Fatal("expected placeholder to be allocated") |
| } |
| lastPhAllocationKey = ph.GetAllocationKey() |
| } |
| |
| // add an ask for a last placeholder and allocate |
| lastPh := phID + strconv.Itoa(7) |
| ask = newAllocationAskTG(lastPh, appID2, taskGroup, res, true) |
| err = gangApp.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add placeholder ask ph-1 to app1") |
| |
| // try to allocate a last placeholder via normal allocate |
| partition.tryAllocate() |
| assertPlaceholderData(t, gangApp, 7, 0) |
| |
| // release allocation: do what the context would do after the shim processing |
| release := &si.AllocationRelease{ |
| PartitionName: partition.Name, |
| ApplicationID: appID2, |
| AllocationKey: lastPhAllocationKey, |
| TerminationType: si.TerminationType_STOPPED_BY_RM, |
| } |
| releases, _ := partition.removeAllocation(release) |
| assert.Equal(t, 1, len(releases), "unexpected number of allocations released") |
| assertPlaceholderData(t, gangApp, 7, 1) |
| } |
| |
| // check PlaceHolderData |
| func assertPlaceholderData(t *testing.T, gangApp *objects.Application, count int64, timedout int64) { |
| assert.Equal(t, len(gangApp.GetAllPlaceholderData()), 1) |
| assert.Equal(t, gangApp.GetAllPlaceholderData()[0].TaskGroupName, taskGroup) |
| assert.Equal(t, gangApp.GetAllPlaceholderData()[0].Count, count) |
| assert.Equal(t, gangApp.GetAllPlaceholderData()[0].Replaced, int64(0)) |
| assert.Equal(t, gangApp.GetAllPlaceholderData()[0].TimedOut, timedout) |
| } |
| |
| // test with a replacement of a placeholder: placeholder on the removed node, real on the 2nd node |
| func TestRemoveNodeWithReplacement(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| // add a new app |
| app := newApplication(appID1, "default", defQueue) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| |
| // add a node with allocation: must have the correct app added already |
| nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) |
| node1 := newNodeMaxResource(nodeID1, nodeRes) |
| appRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) |
| ask := newAllocationAskAll("placeholder", appID1, taskGroup, appRes, 1, true) |
| ph := objects.NewAllocation(nodeID1, ask) |
| allocs := []*objects.Allocation{ph} |
| err = partition.AddNode(node1, allocs) |
| assert.NilError(t, err, "add node1 to partition should not have failed") |
| // get what was allocated |
| allocated := node1.GetAllAllocations() |
| assert.Equal(t, 1, len(allocated), "allocation not added correctly to node1") |
| assertLimits(t, getTestUserGroup(), appRes) |
| assert.Assert(t, resources.Equals(node1.GetAllocatedResource(), appRes), "allocation not added correctly to node1") |
| |
| node2 := setupNode(t, nodeID2, partition, nodeRes) |
| assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not updated as expected") |
| |
| // fake an ask that is used |
| ask = newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, false) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should be added to app") |
| _, err = app.AllocateAsk(allocKey) |
| assert.NilError(t, err, "ask should have been updated without error") |
| assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app should not have pending resources") |
| |
| // add real allocation that is replacing the placeholder on the 2nd node |
| alloc := objects.NewAllocation(nodeID2, ask) |
| alloc.SetRelease(ph) |
| alloc.SetResult(objects.Replaced) |
| node2.AddAllocation(alloc) |
| allocated = node2.GetAllAllocations() |
| assert.Equal(t, 1, len(allocated), "allocation not added correctly to node2") |
| assert.Assert(t, resources.Equals(node2.GetAllocatedResource(), appRes), "allocation not added correctly to node2 (resource count)") |
| assertLimits(t, getTestUserGroup(), appRes) |
| |
| // double link as if the replacement is ongoing |
| ph.SetRelease(alloc) |
| |
| allocs = app.GetAllAllocations() |
| assert.Equal(t, len(allocs), 1, "expected one allocation for the app (placeholder)") |
| assertLimits(t, getTestUserGroup(), appRes) |
| |
| // remove the node with the placeholder |
| released, confirmed := partition.removeNode(nodeID1) |
| assert.Equal(t, 1, partition.GetTotalNodeCount(), "node list was not updated, node was not removed") |
| assert.Equal(t, 1, len(node2.GetAllAllocations()), "remaining node should have allocation") |
| assert.Equal(t, 1, len(released), "node removal did not release correct allocation") |
| assert.Equal(t, 1, len(confirmed), "node removal did not confirm correct allocation") |
| assert.Equal(t, ph.GetAllocationKey(), released[0].GetAllocationKey(), "allocationKey returned by release not the same as the placeholder") |
| assert.Equal(t, alloc.GetAllocationKey(), confirmed[0].GetAllocationKey(), "allocationKey returned by confirmed not the same as the real allocation") |
| assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app should not have pending resources") |
| assert.Assert(t, !app.IsCompleting(), "app should not be COMPLETING after confirming allocation") |
| allocs = app.GetAllAllocations() |
| assert.Equal(t, 1, len(allocs), "expected one allocation for the app (real)") |
| assert.Equal(t, alloc.GetAllocationKey(), allocs[0].GetAllocationKey(), "allocationKey for the app is not the same as the real allocation") |
| assert.Equal(t, objects.Allocated, allocs[0].GetResult(), "allocation state should be allocated") |
| assert.Equal(t, 0, allocs[0].GetReleaseCount(), "real allocation should have no releases linked anymore") |
| assertLimits(t, getTestUserGroup(), appRes) |
| } |
| |
| // test with a replacement of a placeholder: real on the removed node placeholder on the 2nd node |
| func TestRemoveNodeWithReal(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| // add a new app |
| app := newApplication(appID1, "default", defQueue) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| |
| // add a node with allocation: must have the correct app added already |
| nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) |
| node1 := newNodeMaxResource(nodeID1, nodeRes) |
| appRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) |
| ask := newAllocationAskAll("placeholder", appID1, taskGroup, appRes, 1, true) |
| ph := objects.NewAllocation(nodeID1, ask) |
| allocs := []*objects.Allocation{ph} |
| err = partition.AddNode(node1, allocs) |
| assert.NilError(t, err, "add node1 to partition should not have failed") |
| // get what was allocated |
| allocated := node1.GetAllAllocations() |
| assert.Equal(t, 1, len(allocated), "allocation not added correctly to node1") |
| assert.Assert(t, resources.Equals(node1.GetAllocatedResource(), appRes), "allocation not added correctly to node1") |
| assertLimits(t, getTestUserGroup(), appRes) |
| |
| node2 := setupNode(t, nodeID2, partition, nodeRes) |
| assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not updated as expected") |
| |
| // fake an ask that is used |
| ask = newAllocationAskAll(allocKey, appID1, taskGroup, appRes, 1, false) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "ask should be added to app") |
| _, err = app.AllocateAsk(allocKey) |
| assert.NilError(t, err, "ask should have been updated without error") |
| assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app should not have pending resources") |
| |
| // add real allocation that is replacing the placeholder on the 2nd node |
| alloc := objects.NewAllocation(nodeID2, ask) |
| alloc.SetRelease(ph) |
| alloc.SetResult(objects.Replaced) |
| node2.AddAllocation(alloc) |
| allocated = node2.GetAllAllocations() |
| assert.Equal(t, 1, len(allocated), "allocation not added correctly to node2") |
| assert.Assert(t, resources.Equals(node2.GetAllocatedResource(), appRes), "allocation not added correctly to node2 (resource count)") |
| assertLimits(t, getTestUserGroup(), appRes) |
| |
| // double link as if the replacement is ongoing |
| ph.SetRelease(alloc) |
| |
| allocs = app.GetAllAllocations() |
| assert.Equal(t, len(allocs), 1, "expected one allocation for the app (placeholder)") |
| |
| // remove the node with the real allocation |
| released, confirmed := partition.removeNode(nodeID2) |
| assert.Equal(t, 1, partition.GetTotalNodeCount(), "node list was not updated, node was not removed") |
| assert.Equal(t, 0, len(released), "node removal did not release correct allocation") |
| assert.Equal(t, 0, len(confirmed), "node removal did not confirm correct allocation") |
| assert.Assert(t, resources.Equals(app.GetPendingResource(), appRes), "app should have updated pending resources") |
| allocs = app.GetAllAllocations() |
| assert.Equal(t, 1, len(allocs), "expected one allocation for the app (placeholder") |
| assert.Equal(t, ph.GetAllocationKey(), allocs[0].GetAllocationKey(), "allocationKey for the app is not the same as the real allocation") |
| assert.Equal(t, 0, ph.GetReleaseCount(), "no inflight replacements linked") |
| assertLimits(t, getTestUserGroup(), appRes) |
| } |
| |
| func TestAddApp(t *testing.T) { |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| // add a new app |
| app := newApplication(appID1, "default", defQueue) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| // add the same app |
| err = partition.AddApplication(app) |
| if err == nil { |
| t.Errorf("add same application to partition should have failed but did not") |
| } |
| |
| // mark partition stopped, no new application can be added |
| err = partition.handlePartitionEvent(objects.Stop) |
| assert.NilError(t, err, "partition state change failed unexpectedly") |
| |
| app = newApplication(appID2, "default", defQueue) |
| err = partition.AddApplication(app) |
| if err == nil || partition.getApplication(appID2) != nil { |
| t.Errorf("add application on stopped partition should have failed but did not") |
| } |
| |
| // mark partition for deletion, no new application can be added |
| partition.stateMachine.SetState(objects.Active.String()) |
| err = partition.handlePartitionEvent(objects.Remove) |
| assert.NilError(t, err, "partition state change failed unexpectedly") |
| app = newApplication(appID3, "default", defQueue) |
| err = partition.AddApplication(app) |
| if err == nil || partition.getApplication(appID3) != nil { |
| t.Errorf("add application on draining partition should have failed but did not") |
| } |
| } |
| |
| func TestAddAppForced(t *testing.T) { |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| // add a new app to an invalid queue |
| app := newApplication(appID1, "default", "root.invalid") |
| err = partition.AddApplication(app) |
| if err == nil || partition.getApplication(appID1) != nil { |
| t.Fatalf("add application to nonexistent queue should have failed but did not") |
| } |
| |
| // re-add the app, but mark it as forced. this should create the recovery queue and assign the app to it |
| app = newApplicationTags(appID1, "default", "root.invalid", map[string]string{siCommon.AppTagCreateForce: "true"}) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app create failed") |
| partApp := partition.getApplication(appID1) |
| if partApp == nil { |
| t.Fatalf("app not found after adding to partition") |
| } |
| recoveryQueue := partition.GetQueue(common.RecoveryQueueFull) |
| if recoveryQueue == nil { |
| t.Fatalf("recovery queue not found") |
| } |
| assert.Equal(t, common.RecoveryQueueFull, partApp.GetQueuePath(), "wrong queue path for app2") |
| assert.Check(t, recoveryQueue == partApp.GetQueue(), "wrong queue for app") |
| assert.Equal(t, 1, len(recoveryQueue.GetCopyOfApps()), "wrong queue length") |
| |
| // add second forced app. this should use the existing recovery queue rather than recreating it |
| app2 := newApplicationTags(appID2, "default", "root.invalid2", map[string]string{siCommon.AppTagCreateForce: "true"}) |
| err = partition.AddApplication(app2) |
| assert.NilError(t, err, "app2 create failed") |
| partApp2 := partition.getApplication(appID2) |
| if partApp2 == nil { |
| t.Fatalf("app2 not found after adding to partition") |
| } |
| assert.Equal(t, common.RecoveryQueueFull, partApp2.GetQueuePath(), "wrong queue path for app2") |
| assert.Check(t, recoveryQueue == partApp2.GetQueue(), "wrong queue for app2") |
| assert.Equal(t, 2, len(recoveryQueue.GetCopyOfApps()), "wrong queue length") |
| |
| // add third app (not forced), but referencing the recovery queue. this should fail. |
| app3 := newApplication(appID3, "default", common.RecoveryQueueFull) |
| err = partition.AddApplication(app3) |
| if err == nil || partition.getApplication(appID3) != nil { |
| t.Fatalf("add app3 to recovery queue should have failed but did not") |
| } |
| |
| // re-add third app, but forced. This should succeed. |
| app3 = newApplicationTags(appID3, "default", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}) |
| err = partition.AddApplication(app3) |
| assert.NilError(t, err, "app3 create failed") |
| partApp3 := partition.getApplication(appID3) |
| if partApp3 == nil { |
| t.Fatalf("app3 not found after adding to partition") |
| } |
| assert.Equal(t, common.RecoveryQueueFull, partApp3.GetQueuePath(), "wrong queue path for app3") |
| assert.Check(t, recoveryQueue == partApp3.GetQueue(), "wrong queue for app3") |
| assert.Equal(t, 3, len(recoveryQueue.GetCopyOfApps()), "wrong queue length") |
| } |
| |
| func TestAddAppForcedWithPlacement(t *testing.T) { |
| confWith := configs.PartitionConfig{ |
| Name: "test", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "root", |
| Parent: true, |
| SubmitACL: "*", |
| Queues: nil, |
| }, |
| }, |
| PlacementRules: []configs.PlacementRule{ |
| { |
| Name: "tag", |
| Value: "queue", |
| Create: true, |
| }, |
| }, |
| Limits: nil, |
| NodeSortPolicy: configs.NodeSortingPolicy{}, |
| } |
| partition, err := newPartitionContext(confWith, rmID, nil) |
| assert.NilError(t, err, "test partition create failed with error") |
| |
| // add a new app using tag rule |
| app := newApplicationTags(appID1, "default", "", map[string]string{"queue": "root.test"}) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app to tagged queue") |
| assert.Equal(t, "root.test", app.GetQueuePath(), "app assigned to wrong queue") |
| |
| // add a second app without a tag rule |
| app2 := newApplicationTags(appID2, "default", "root.untagged", map[string]string{}) |
| err = partition.AddApplication(app2) |
| if err == nil || partition.getApplication(appID2) != nil { |
| t.Fatalf("add app2 to fixed queue should have failed but did not") |
| } |
| |
| // attempt to add the app again, but with forced addition |
| app2 = newApplicationTags(appID2, "default", "root.untagged", map[string]string{siCommon.AppTagCreateForce: "true"}) |
| err = partition.AddApplication(app2) |
| assert.NilError(t, err, "failed to add app2 to tagged queue") |
| assert.Equal(t, common.RecoveryQueueFull, app2.GetQueuePath(), "app2 assigned to wrong queue") |
| |
| // add a third app, but with the recovery queue tagged |
| app3 := newApplicationTags(appID3, "default", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}) |
| err = partition.AddApplication(app3) |
| assert.NilError(t, err, "failed to add app3 to tagged queue") |
| assert.Equal(t, common.RecoveryQueueFull, app3.GetQueuePath(), "app2 assigned to wrong queue") |
| } |
| |
| func TestAddAppTaskGroup(t *testing.T) { |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| // add a new app: TG specified with resource no max set on the queue |
| task := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}) |
| app := newApplicationTG(appID1, "default", defQueue, task) |
| assert.Assert(t, resources.Equals(app.GetPlaceholderAsk(), task), "placeholder ask not set as expected") |
| // queue sort policy is FIFO this should work |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "add application with zero task group to partition should not have failed") |
| |
| app = newApplicationTG(appID2, "default", defQueue, task) |
| assert.Assert(t, resources.Equals(app.GetPlaceholderAsk(), task), "placeholder ask not set as expected") |
| |
| // queue now has fair as sort policy app add should fail |
| queue := partition.GetQueue(defQueue) |
| err = queue.ApplyConf(configs.QueueConfig{ |
| Name: "default", |
| Parent: false, |
| Queues: nil, |
| Properties: map[string]string{configs.ApplicationSortPolicy: "fair"}, |
| }) |
| assert.NilError(t, err, "updating queue should not have failed") |
| queue.UpdateQueueProperties() |
| err = partition.AddApplication(app) |
| if err == nil || partition.getApplication(appID2) != nil { |
| t.Errorf("add application should have failed due to queue sort policy but did not") |
| } |
| } |
| |
| func TestRemoveApp(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| // add a new app that will just sit around to make sure we remove the right one |
| appNotRemoved := "will_not_remove" |
| app := newApplication(appNotRemoved, "default", defQueue) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| // add a node to allow adding an allocation |
| setupNode(t, nodeID1, partition, resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000000})) |
| |
| appRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000}) |
| ask := newAllocationAsk("alloc-nr", appNotRemoved, appRes) |
| alloc := objects.NewAllocation(nodeID1, ask) |
| err = partition.addAllocation(alloc) |
| assert.NilError(t, err, "add allocation to partition should not have failed") |
| assertLimits(t, getTestUserGroup(), appRes) |
| |
| allocs := partition.removeApplication("does_not_exist") |
| if allocs != nil { |
| t.Errorf("non existing application returned unexpected values: allocs = %v", allocs) |
| } |
| |
| // add another new app |
| app = newApplication(appID1, "default", defQueue) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| assertLimits(t, getTestUserGroup(), appRes) |
| |
| // remove the newly added app (no allocations) |
| allocs = partition.removeApplication(appID1) |
| assert.Equal(t, 0, len(allocs), "existing application without allocations returned allocations %v", allocs) |
| assert.Equal(t, 1, len(partition.applications), "existing application was not removed") |
| assertLimits(t, getTestUserGroup(), appRes) |
| |
| // add the application again and then an allocation |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| |
| ask = newAllocationAsk("alloc-1", appID1, appRes) |
| alloc = objects.NewAllocation(nodeID1, ask) |
| err = partition.addAllocation(alloc) |
| assert.NilError(t, err, "add allocation to partition should not have failed") |
| assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 2)) |
| |
| // remove the newly added app |
| allocs = partition.removeApplication(appID1) |
| assert.Equal(t, 1, len(allocs), "existing application with allocations returned unexpected allocations %v", allocs) |
| assert.Equal(t, 1, len(partition.applications), "existing application was not removed") |
| if partition.GetTotalAllocationCount() != 1 { |
| t.Errorf("allocation that should have been left was removed") |
| } |
| assertLimits(t, getTestUserGroup(), appRes) |
| |
| allocs = partition.removeApplication("will_not_remove") |
| assert.Equal(t, 1, len(allocs), "existing application with allocations returned unexpected allocations %v", allocs) |
| assertLimits(t, getTestUserGroup(), resources.Zero) |
| } |
| |
| func TestRemoveAppAllocs(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| // add a new app that will just sit around to make sure we remove the right one |
| appNotRemoved := "will_not_remove" |
| app := newApplication(appNotRemoved, "default", defQueue) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| // add a node to allow adding an allocation |
| setupNode(t, nodeID1, partition, resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000000})) |
| |
| appRes := resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000}) |
| ask := newAllocationAsk("alloc-nr", appNotRemoved, appRes) |
| alloc := objects.NewAllocation(nodeID1, ask) |
| err = partition.addAllocation(alloc) |
| assert.NilError(t, err, "add allocation to partition should not have failed") |
| assertLimits(t, getTestUserGroup(), appRes) |
| |
| ask = newAllocationAsk("alloc-1", appNotRemoved, appRes) |
| allocationKey := "alloc-1" |
| alloc = objects.NewAllocation(nodeID1, ask) |
| err = partition.addAllocation(alloc) |
| assert.NilError(t, err, "add allocation to partition should not have failed") |
| assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 2)) |
| release := &si.AllocationRelease{ |
| PartitionName: "default", |
| ApplicationID: "", |
| AllocationKey: "", |
| TerminationType: si.TerminationType_STOPPED_BY_RM, |
| } |
| |
| allocs, _ := partition.removeAllocation(release) |
| assert.Equal(t, 0, len(allocs), "empty removal request returned allocations: %v", allocs) |
| assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 2)) |
| // create a new release without app: should just return |
| release.ApplicationID = "does_not_exist" |
| allocs, _ = partition.removeAllocation(release) |
| assert.Equal(t, 0, len(allocs), "removal request for non existing application returned allocations: %v", allocs) |
| assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 2)) |
| // create a new release with app, non existing allocation: should just return |
| release.ApplicationID = appNotRemoved |
| release.AllocationKey = "does_not_exist" |
| allocs, _ = partition.removeAllocation(release) |
| assert.Equal(t, 0, len(allocs), "removal request for non existing allocation returned allocations: %v", allocs) |
| assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 2)) |
| // create a new release with app, existing allocation: should return 1 alloc |
| assert.Equal(t, 2, partition.GetTotalAllocationCount(), "pre-remove allocation list incorrect: %v", partition.allocations) |
| release.AllocationKey = allocationKey |
| allocs, _ = partition.removeAllocation(release) |
| assert.Equal(t, 1, len(allocs), "removal request for existing allocation returned wrong allocations: %v", allocs) |
| assert.Equal(t, 1, partition.GetTotalAllocationCount(), "allocation removal requests removed more than expected: %v", partition.allocations) |
| assertLimits(t, getTestUserGroup(), resources.Multiply(appRes, 1)) |
| // create a new release with app, no allocationKey: should return last left alloc |
| release.AllocationKey = "" |
| allocs, _ = partition.removeAllocation(release) |
| assert.Equal(t, 1, len(allocs), "removal request for existing allocation returned wrong allocations: %v", allocs) |
| assert.Equal(t, 0, partition.GetTotalAllocationCount(), "removal requests did not remove all allocations: %v", partition.allocations) |
| assertLimits(t, getTestUserGroup(), resources.Zero) |
| } |
| |
| func TestRemoveAllPlaceholderAllocs(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| setupNode(t, nodeID1, partition, resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000000})) |
| |
| // add a new app that will just sit around to make sure we remove the right one |
| app := newApplication(appID1, "default", defQueue) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "10"}) |
| assert.NilError(t, err, "failed to create resource") |
| phAsk1 := newAllocationAskTG(phID, appID1, taskGroup, res, true) |
| phAlloc1 := objects.NewAllocation(nodeID1, phAsk1) |
| err = partition.addAllocation(phAlloc1) |
| assert.NilError(t, err, "could not add allocation to partition") |
| phAsk2 := newAllocationAskTG(phID2, appID1, taskGroup, res, true) |
| phAlloc2 := objects.NewAllocation(nodeID1, phAsk2) |
| err = partition.addAllocation(phAlloc2) |
| assert.NilError(t, err, "could not add allocation to partition") |
| partition.removeAllocation(&si.AllocationRelease{ |
| PartitionName: "default", |
| ApplicationID: appID1, |
| AllocationKey: "", |
| TerminationType: si.TerminationType_STOPPED_BY_RM, |
| }) |
| assert.Equal(t, 0, partition.getPhAllocationCount()) |
| } |
| |
| // Dynamic queue creation based on the name from the rules |
| func TestCreateQueue(t *testing.T) { |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| // top level should fail |
| _, err = partition.createQueue("test", security.UserGroup{}) |
| if err == nil { |
| t.Errorf("top level queue creation did not fail") |
| } |
| |
| // create below leaf |
| _, err = partition.createQueue("root.default.test", security.UserGroup{}) |
| if err == nil { |
| t.Errorf("'root.default.test' queue creation did not fail") |
| } |
| |
| // single level create |
| var queue *objects.Queue |
| queue, err = partition.createQueue("root.test", security.UserGroup{}) |
| assert.NilError(t, err, "'root.test' queue creation failed") |
| if queue == nil { |
| t.Errorf("'root.test' queue creation failed without error") |
| } |
| if queue != nil && !queue.IsLeafQueue() && queue.IsManaged() { |
| t.Errorf("'root.test' queue creation failed not created with correct settings: %v", queue) |
| } |
| |
| // multiple level create |
| queue, err = partition.createQueue("root.parent.test", security.UserGroup{}) |
| assert.NilError(t, err, "'root.parent.test' queue creation failed") |
| if queue == nil { |
| t.Fatalf("'root.parent.test' queue creation failed without error") |
| } |
| if !queue.IsLeafQueue() && queue.IsManaged() { |
| t.Errorf("'root.parent.test' queue not created with correct settings: %v", queue) |
| } |
| queue = partition.GetQueue("root.parent") |
| if queue == nil { |
| t.Errorf("'root.parent' queue creation failed: parent is not set correctly") |
| } |
| if queue != nil && queue.IsLeafQueue() && queue.IsManaged() { |
| t.Errorf("'root.parent' parent queue not created with correct settings: %v", queue) |
| } |
| |
| // deep level create |
| queue, err = partition.createQueue("root.parent.next.level.test.leaf", security.UserGroup{}) |
| assert.NilError(t, err, "'root.parent.next.level.test.leaf' queue creation failed") |
| if queue == nil { |
| t.Errorf("'root.parent.next.level.test.leaf' queue creation failed without error") |
| } |
| if queue != nil && !queue.IsLeafQueue() && queue.IsManaged() { |
| t.Errorf("'root.parent.next.level.test.leaf' queue not created with correct settings: %v", queue) |
| } |
| } |
| |
| // Managed queue creation based on the config |
| func TestCreateDeepQueueConfig(t *testing.T) { |
| conf := make([]configs.QueueConfig, 0) |
| conf = append(conf, configs.QueueConfig{ |
| Name: "level1", |
| Parent: true, |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "level2", |
| Parent: true, |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "level3", |
| Parent: true, |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "level4", |
| Parent: true, |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "level5", |
| Parent: false, |
| Queues: nil, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }, |
| }) |
| |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| // There is a queue setup as the config must be valid when we run |
| root := partition.GetQueue("root") |
| if root == nil { |
| t.Error("root queue not found in partition") |
| } |
| err = partition.addQueue(conf, root) |
| assert.NilError(t, err, "'root.level1.level2.level3.level4.level5' queue creation from config failed") |
| queue := partition.GetQueue("root.level1.level2.level3.level4.level5") |
| if queue == nil { |
| t.Fatal("root.level1.level2.level3.level4.level5 queue not found in partition") |
| } |
| assert.Equal(t, "root.level1.level2.level3.level4.level5", queue.GetQueuePath(), "root.level1.level2.level3.level4.level5 queue not found in partition") |
| } |
| |
| func assertUpdateQueues(t *testing.T, resourceType string, resMap map[string]string) { |
| var resExpect *resources.Resource |
| var err error |
| if len(resMap) > 0 { |
| resExpect, err = resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "resource from conf failed") |
| } else { |
| resExpect = nil |
| } |
| |
| var res configs.Resources |
| switch resourceType { |
| case "max": |
| res = configs.Resources{Max: resMap} |
| case "guaranteed": |
| res = configs.Resources{Guaranteed: resMap} |
| default: |
| res = configs.Resources{Max: resMap, Guaranteed: resMap} |
| } |
| |
| conf := []configs.QueueConfig{ |
| { |
| Name: "parent", |
| Parent: true, |
| Resources: res, |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "leaf", |
| Parent: false, |
| Queues: nil, |
| }, |
| }, |
| }, |
| } |
| |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| // There is a queue setup as the config must be valid when we run |
| root := partition.GetQueue("root") |
| if root == nil { |
| t.Error("root queue not found in partition") |
| } |
| |
| err = partition.updateQueues(conf, root) |
| assert.NilError(t, err, "queue update from config failed") |
| parent := partition.GetQueue("root.parent") |
| if parent == nil { |
| t.Fatal("parent queue should still exist") |
| } |
| switch resourceType { |
| case "max": |
| assert.Assert(t, resources.Equals(parent.GetMaxResource(), resExpect), "parent queue max resource should have been updated") |
| assert.Assert(t, resources.Equals(parent.GetGuaranteedResource(), nil), "parent queue guaranteed resource should have been updated") |
| case "guaranteed": |
| assert.Assert(t, resources.Equals(parent.GetMaxResource(), nil), "parent queue max resource should have been updated") |
| assert.Assert(t, resources.Equals(parent.GetGuaranteedResource(), resExpect), "parent queue guaranteed resource should have been updated") |
| default: |
| assert.Assert(t, resources.Equals(parent.GetMaxResource(), resExpect), "parent queue max resource should have been updated") |
| assert.Assert(t, resources.Equals(parent.GetGuaranteedResource(), resExpect), "parent queue guaranteed resource should have been updated") |
| } |
| leaf := partition.GetQueue("root.parent.leaf") |
| if leaf == nil { |
| t.Fatal("leaf queue should have been created") |
| } |
| } |
| |
| func TestUpdateQueues(t *testing.T) { |
| conf := []configs.QueueConfig{ |
| { |
| Name: "parent", |
| Parent: false, |
| Queues: nil, |
| }, |
| } |
| |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| // There is a queue setup as the config must be valid when we run |
| root := partition.GetQueue("root") |
| if root == nil { |
| t.Error("root queue not found in partition") |
| } |
| err = partition.updateQueues(conf, root) |
| assert.NilError(t, err, "queue update from config failed") |
| def := partition.GetQueue(defQueue) |
| if def == nil { |
| t.Fatal("default queue should still exist") |
| } |
| assert.Assert(t, def.IsDraining(), "'root.default' queue should have been marked for removal") |
| |
| assertUpdateQueues(t, "max", map[string]string{"vcore": "2"}) |
| assertUpdateQueues(t, "max", map[string]string{"vcore": "5"}) |
| assertUpdateQueues(t, "max", map[string]string{"memory": "5"}) |
| assertUpdateQueues(t, "guaranteed", map[string]string{"vcore": "2", "memory": "5"}) |
| assertUpdateQueues(t, "guaranteed", map[string]string{"vcore": "4", "memory": "3"}) |
| assertUpdateQueues(t, "guaranteed", map[string]string{"vcore": "10"}) |
| assertUpdateQueues(t, "both", map[string]string{"vcore": "2", "memory": "5"}) |
| assertUpdateQueues(t, "both", map[string]string{"vcore": "5", "memory": "2"}) |
| assertUpdateQueues(t, "both", map[string]string{"vcore": "5"}) |
| assertUpdateQueues(t, "both", map[string]string{}) |
| } |
| |
| func TestReAddQueues(t *testing.T) { |
| conf := []configs.QueueConfig{ |
| { |
| Name: "parent", |
| Parent: true, |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "leaf", |
| Parent: false, |
| Queues: nil, |
| }, |
| }, |
| }, |
| } |
| |
| confDefault := []configs.QueueConfig{ |
| { |
| Name: "default", |
| Parent: false, |
| Queues: nil, |
| }, |
| { |
| Name: "parent", |
| Parent: true, |
| Queues: nil, |
| }, |
| } |
| |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| // There is a queue setup as the config must be valid when we run |
| root := partition.GetQueue("root") |
| if root == nil { |
| t.Error("root queue not found in partition") |
| } |
| def := partition.GetQueue(defQueue) |
| if def == nil { |
| t.Fatal("default queue should exist") |
| } |
| err = partition.updateQueues(conf, root) |
| assert.NilError(t, err, "queue update from config failed") |
| leaf := partition.GetQueue("root.parent.leaf") |
| if leaf == nil { |
| t.Fatal("leaf queue should be created") |
| } |
| assert.Assert(t, def.IsDraining(), "'root.default' queue should have been marked for removal") |
| err = partition.updateQueues(confDefault, root) |
| assert.NilError(t, err, "queue update from config default failed") |
| assert.Assert(t, def.IsRunning(), "'root.default' queue should have been marked running again") |
| assert.Assert(t, leaf.IsDraining(), "'root.parent.leaf' queue should have been marked for removal") |
| partition.partitionManager.cleanQueues(root) |
| leaf = partition.GetQueue("root.parent.leaf") |
| if leaf != nil { |
| t.Fatal("leaf queue should have been cleaned up") |
| } |
| def = partition.GetQueue(defQueue) |
| if def == nil { |
| t.Fatal("default queue should still exist") |
| } |
| } |
| |
| func TestGetApplication(t *testing.T) { |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| app := newApplication(appID1, "default", defQueue) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "no error expected while adding the application") |
| assert.Equal(t, partition.GetApplication(appID1), app, "partition failed to add app incorrect app returned") |
| app2 := newApplication(appID2, "default", "unknown") |
| err = partition.AddApplication(app2) |
| if err == nil { |
| t.Error("app-2 should not have been added to the partition") |
| } |
| if partition.GetApplication(appID2) != nil { |
| t.Fatal("partition added app incorrectly should have failed") |
| } |
| } |
| |
| func TestGetQueue(t *testing.T) { |
| // get the partition |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "test partition create failed with error") |
| var nilQueue *objects.Queue |
| // test partition has a root queue |
| queue := partition.GetQueue("") |
| assert.Equal(t, queue, nilQueue, "partition with just root returned not nil for empty request: %v", queue) |
| queue = partition.GetQueue("unknown") |
| assert.Equal(t, queue, nilQueue, "partition returned not nil for unqualified unknown request: %v", queue) |
| queue = partition.GetQueue("root") |
| assert.Equal(t, queue, partition.root, "partition did not return root as requested") |
| |
| parentConf := configs.QueueConfig{ |
| Name: "parent", |
| Parent: true, |
| Queues: nil, |
| } |
| var parent *objects.Queue |
| // manually add the queue in below the root |
| parent, err = objects.NewConfiguredQueue(parentConf, queue) |
| assert.NilError(t, err, "failed to create parent queue") |
| queue = partition.GetQueue("root.unknown") |
| assert.Equal(t, queue, nilQueue, "partition returned not nil for non existing queue name request: %v", queue) |
| queue = partition.GetQueue("root.parent") |
| assert.Equal(t, queue, parent, "partition returned nil for existing queue name request") |
| } |
| |
| func TestTryAllocate(t *testing.T) { |
| setupUGM() |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| if alloc := partition.tryAllocate(); alloc != nil { |
| t.Fatalf("empty cluster allocate returned allocation: %v", alloc.String()) |
| } |
| |
| // create a set of queues and apps: app-1 2 asks; app-2 1 ask (same size) |
| // sub-leaf will have an app with 2 requests and thus more unconfirmed resources compared to leaf2 |
| // this should filter up in the parent and the 1st allocate should show an app-1 allocation |
| // the ask with the higher priority is the second one added alloc-2 for app-1 |
| app := newApplication(appID1, "default", "root.parent.sub-leaf") |
| |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "1"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // add to the partition |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| err = app.AddAllocationAsk(newAllocationAsk(allocKey, appID1, res)) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-1") |
| err = app.AddAllocationAsk(newAllocationAskPriority("alloc-2", appID1, res, 2)) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app-1") |
| |
| app = newApplication(appID2, "default", "root.leaf") |
| // add to the partition |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app-2 to partition") |
| err = app.AddAllocationAsk(newAllocationAskPriority(allocKey, appID2, res, 2)) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-2") |
| |
| expectedQueuesMaxLimits := make(map[string]map[string]interface{}) |
| expectedQueuesMaxLimits["root"] = make(map[string]interface{}) |
| expectedQueuesMaxLimits["root.leaf"] = make(map[string]interface{}) |
| expectedQueuesMaxLimits["root"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10, "vcores": 10}) |
| expectedQueuesMaxLimits["root.leaf"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 5, "vcores": 5}) |
| expectedQueuesMaxLimits["root"][maxapplications] = uint64(10) |
| expectedQueuesMaxLimits["root.leaf"][maxapplications] = uint64(1) |
| assertUserGroupResourceMaxLimits(t, getTestUserGroup(), nil, expectedQueuesMaxLimits) |
| |
| // first allocation should be app-1 and alloc-2 |
| alloc := partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations should have been 0") |
| assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application app-1 to be allocated") |
| assert.Equal(t, alloc.GetAllocationKey(), allocKey2, "expected ask alloc-2 to be allocated") |
| assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.Multiply(res, 1), expectedQueuesMaxLimits) |
| |
| // second allocation should be app-2 and alloc-1: higher up in the queue hierarchy |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations should have been 0") |
| assert.Equal(t, alloc.GetApplicationID(), appID2, "expected application app-2 to be allocated") |
| assert.Equal(t, alloc.GetAllocationKey(), allocKey, "expected ask alloc-1 to be allocated") |
| assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.Multiply(res, 2), expectedQueuesMaxLimits) |
| |
| // third allocation should be app-1 and alloc-1 |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations should have been 0") |
| assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application app-1 to be allocated") |
| assert.Equal(t, alloc.GetAllocationKey(), allocKey, "expected ask alloc-1 to be allocated") |
| assert.Assert(t, resources.IsZero(partition.root.GetPendingResource()), "pending resources should be set to zero") |
| assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.Multiply(res, 3), expectedQueuesMaxLimits) |
| } |
| |
| // allocate ask request with required node |
| func TestRequiredNodeReservation(t *testing.T) { |
| setupUGM() |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| if alloc := partition.tryAllocate(); alloc != nil { |
| t.Fatalf("empty cluster allocate returned allocation: %v", alloc.String()) |
| } |
| |
| app := newApplication(appID1, "default", "root.parent.sub-leaf") |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "8"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // add to the partition |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| ask := newAllocationAsk(allocKey, appID1, res) |
| ask.SetRequiredNode(nodeID1) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-1") |
| assertLimits(t, getTestUserGroup(), nil) |
| |
| // first allocation should be app-1 and alloc-1 |
| alloc := partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations should have been 0") |
| assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application app-1 to be allocated") |
| assert.Equal(t, alloc.GetAllocationKey(), allocKey, "expected ask alloc-1 to be allocated") |
| assertLimits(t, getTestUserGroup(), res) |
| |
| ask2 := newAllocationAsk(allocKey2, appID1, res) |
| ask2.SetRequiredNode(nodeID1) |
| err = app.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app-1") |
| alloc = partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("allocation attempt should have returned an allocation") |
| } |
| // check if updated (must be after allocate call) |
| assert.Equal(t, 1, len(app.GetReservations()), "app should have one reserved ask") |
| assert.Equal(t, 1, len(app.GetAskReservations(allocKey2)), "ask should have been reserved") |
| assertLimits(t, getTestUserGroup(), res) |
| |
| // allocation that fits on the node should not be allocated |
| var res2 *resources.Resource |
| res2, err = resources.NewResourceFromConf(map[string]string{"vcore": "1"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| ask3 := newAllocationAsk("alloc-3", appID1, res2) |
| ask3.SetRequiredNode(nodeID1) |
| err = app.AddAllocationAsk(ask3) |
| assert.NilError(t, err, "failed to add ask alloc-3 to app-1") |
| alloc = partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("allocation attempt should not have returned an allocation") |
| } |
| |
| // reservation count remains same as last try allocate should have failed to find a reservation |
| assert.Equal(t, 1, len(app.GetReservations()), "ask should not have been reserved, count changed") |
| assertLimits(t, getTestUserGroup(), res) |
| } |
| |
| // allocate ask request with required node having non daemon set reservations |
| func TestRequiredNodeCancelNonDSReservations(t *testing.T) { |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| if alloc := partition.tryAllocate(); alloc != nil { |
| t.Fatalf("empty cluster allocate returned allocation: %s", alloc) |
| } |
| |
| // override the reservation delay, and cleanup when done |
| objects.SetReservationDelay(10 * time.Nanosecond) |
| defer objects.SetReservationDelay(2 * time.Second) |
| |
| // turn off the second node |
| node2 := partition.GetNode(nodeID2) |
| node2.SetSchedulable(false) |
| |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "7"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // only one resource for alloc fits on a node |
| app := newApplication(appID1, "default", "root.parent.sub-leaf") |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| |
| ask := newAllocationAsk("alloc-1", appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask to app") |
| ask = newAllocationAsk("alloc-2", appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask to app") |
| // calculate the resource size using the repeat request (reuse is possible using proto conversions in ask) |
| res.MultiplyTo(2) |
| assert.Assert(t, resources.Equals(res, app.GetPendingResource()), "pending resource not set as expected") |
| assert.Assert(t, resources.Equals(res, partition.root.GetPendingResource()), "pending resource not set as expected on root queue") |
| |
| // the first one should be allocated |
| alloc := partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("1st allocation did not return the correct allocation") |
| } |
| assert.Equal(t, objects.Allocated, alloc.GetResult(), "allocation result should have been allocated") |
| |
| // the second one should be reserved as the 2nd node is not scheduling |
| alloc = partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("2nd allocation did not return the correct allocation") |
| } |
| // check if updated (must be after allocate call) |
| assert.Equal(t, 1, len(app.GetReservations()), "ask should have been reserved") |
| assert.Equal(t, 1, len(app.GetQueue().GetReservedApps()), "queue reserved apps should be 1") |
| |
| res1, err := resources.NewResourceFromConf(map[string]string{"vcore": "1"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // only one resource for alloc fits on a node |
| app1 := newApplication(appID2, "default", "root.parent.sub-leaf") |
| err = partition.AddApplication(app1) |
| assert.NilError(t, err, "failed to add app-2 to partition") |
| |
| // required node set on ask |
| ask2 := newAllocationAsk("alloc-2", appID2, res1) |
| ask2.SetRequiredNode(nodeID1) |
| err = app1.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app-1") |
| |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("1st allocation did not return the correct allocation") |
| } |
| assert.Equal(t, objects.Allocated, alloc.GetResult(), "allocation result should have been allocated") |
| |
| // earlier app (app1) reservation count should be zero |
| assert.Equal(t, 0, len(app.GetReservations()), "ask should have been reserved") |
| assert.Equal(t, 0, len(app.GetQueue().GetReservedApps()), "queue reserved apps should be 0") |
| } |
| |
| // allocate ask request with required node having daemon set reservations |
| func TestRequiredNodeCancelDSReservations(t *testing.T) { |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| if alloc := partition.tryAllocate(); alloc != nil { |
| t.Fatalf("empty cluster allocate returned allocation: %s", alloc) |
| } |
| |
| // override the reservation delay, and cleanup when done |
| objects.SetReservationDelay(10 * time.Nanosecond) |
| defer objects.SetReservationDelay(2 * time.Second) |
| |
| // turn off the second node |
| node2 := partition.GetNode(nodeID2) |
| node2.SetSchedulable(false) |
| |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "7"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // only one resource for alloc fits on a node |
| app := newApplication(appID1, "default", "root.parent.sub-leaf") |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| |
| ask := newAllocationAsk("alloc-1", appID1, res) |
| ask.SetRequiredNode(nodeID1) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask 1 to app") |
| ask = newAllocationAsk("alloc-2", appID1, res) |
| ask.SetRequiredNode(nodeID1) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask 2 to app") |
| // calculate the resource size using the repeat request (reuse is possible using proto conversions in ask) |
| res.MultiplyTo(2) |
| assert.Assert(t, resources.Equals(res, app.GetPendingResource()), "pending resource not set as expected") |
| assert.Assert(t, resources.Equals(res, partition.root.GetPendingResource()), "pending resource not set as expected on root queue") |
| |
| // the first one should be allocated |
| alloc := partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("1st allocation did not return the correct allocation") |
| } |
| assert.Equal(t, objects.Allocated, alloc.GetResult(), "allocation result should have been allocated") |
| |
| // the second one should be reserved as the 2nd node is not scheduling |
| alloc = partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("2nd allocation should not return allocation") |
| } |
| // check if updated (must be after allocate call) |
| assert.Equal(t, 1, len(app.GetReservations()), "ask should have been reserved") |
| assert.Equal(t, 1, len(app.GetQueue().GetReservedApps()), "queue reserved apps should be 1") |
| |
| res1, err := resources.NewResourceFromConf(map[string]string{"vcore": "1"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // only one resource for alloc fits on a node |
| app1 := newApplication(appID2, "default", "root.parent.sub-leaf") |
| err = partition.AddApplication(app1) |
| assert.NilError(t, err, "failed to add app-2 to partition") |
| |
| // required node set on ask |
| ask2 := newAllocationAsk("alloc-2", appID2, res1) |
| ask2.SetRequiredNode(nodeID1) |
| err = app1.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app-1") |
| |
| alloc = partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("3rd allocation should not return allocation") |
| } |
| // still reservation count is 1 |
| assert.Equal(t, 1, len(app.GetReservations()), "ask should have been reserved") |
| assert.Equal(t, 1, len(app.GetQueue().GetReservedApps()), "queue reserved apps should be 1") |
| } |
| |
| func TestRequiredNodeNotExist(t *testing.T) { |
| setupUGM() |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| if alloc := partition.tryAllocate(); alloc != nil { |
| t.Fatalf("empty cluster allocate returned allocation: %v", alloc.String()) |
| } |
| |
| // add app to the partition |
| app := newApplication(appID1, "default", "root.parent.sub-leaf") |
| err := partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| |
| // normal ask with node that does not exist |
| var res *resources.Resource |
| res, err = resources.NewResourceFromConf(map[string]string{"vcore": "1"}) |
| assert.NilError(t, err, "failed to create resource") |
| ask := newAllocationAsk(allocKey, appID1, res) |
| ask.SetRequiredNode("unknown") |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-1") |
| |
| // try to allocate on the unknown node (handle panic if needed) |
| defer func() { |
| if r := recover(); r != nil { |
| t.Fatal("panic on nil required node") |
| } |
| }() |
| alloc := partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("allocation should not have worked on unknown node") |
| } |
| assertLimits(t, getTestUserGroup(), nil) |
| } |
| |
| // basic ds scheduling on specific node in first allocate run itself (without any need for reservation) |
| func TestRequiredNodeAllocation(t *testing.T) { |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| if alloc := partition.tryAllocate(); alloc != nil { |
| t.Fatalf("empty cluster allocate returned allocation: %v", alloc.String()) |
| } |
| |
| app := newApplication(appID1, "default", "root.parent.sub-leaf") |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "4"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // add to the partition |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| |
| // normal ask |
| ask := newAllocationAsk(allocKey, appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-1") |
| |
| // first allocation should be app-1 and alloc-1 |
| alloc := partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations should have been 0") |
| assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application app-1 to be allocated") |
| assert.Equal(t, alloc.GetAllocationKey(), allocKey, "expected ask alloc-1 to be allocated") |
| assertLimits(t, getTestUserGroup(), res) |
| |
| // required node set on ask |
| ask2 := newAllocationAsk(allocKey2, appID1, res) |
| ask2.SetRequiredNode(nodeID1) |
| err = app.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app-1") |
| |
| // since node-1 available resource is larger than required ask gets allocated |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| // ensure there is no reservations |
| assert.Equal(t, 0, len(app.GetReservations()), "ask should not have been reserved") |
| assert.Equal(t, alloc.GetAllocationKey(), allocKey2, "expected ask alloc-2 to be allocated") |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| assertLimits(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| } |
| |
| func assertPreemptedResource(t *testing.T, appSummary *objects.ApplicationSummary, memorySeconds int64, |
| vcoresSecconds int64) { |
| detailedResource := appSummary.PreemptedResource.TrackedResourceMap["UNKNOWN"] |
| memValue, memPresent := detailedResource["memory"] |
| vcoreValue, vcorePresent := detailedResource["vcore"] |
| |
| if memorySeconds != -1 { |
| assert.Equal(t, memorySeconds, memValue) |
| } else { |
| assert.Equal(t, memPresent, false) |
| } |
| |
| if vcoresSecconds != -1 { |
| assert.Equal(t, vcoresSecconds, vcoreValue) |
| } else { |
| assert.Equal(t, vcorePresent, false) |
| } |
| } |
| |
| func TestPreemption(t *testing.T) { |
| setupUGM() |
| partition, app1, app2, alloc1, alloc2 := setupPreemption(t) |
| |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "5"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // ask 3 |
| ask3 := newAllocationAskPreempt(allocKey3, appID2, 1, res) |
| err = app2.AddAllocationAsk(ask3) |
| assert.NilError(t, err, "failed to add ask alloc-3 to app-2") |
| |
| // delay so that preemption delay passes |
| // also make the delay 1 second to have a minimum non-zero resource*seconds measurement for preempted resources |
| time.Sleep(time.Second) |
| |
| // third allocation should not succeed, as we are currently above capacity |
| alloc := partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("unexpected allocation") |
| } |
| |
| // alloc-2 (as it is newer) should now be marked preempted |
| assert.Assert(t, !alloc1.IsPreempted(), "alloc-1 is preempted") |
| assert.Assert(t, alloc2.IsPreempted(), "alloc-2 is not preempted") |
| |
| // allocation should still not do anything as we have not yet released the preempted allocation |
| // but the ask should have a reservation |
| alloc = partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("unexpected allocation") |
| } |
| |
| // currently preempting resources in victim queue should be updated |
| preemptingRes := partition.GetQueue("root.parent.leaf1").GetPreemptingResource() |
| assert.Assert(t, resources.Equals(preemptingRes, res), "incorrect preempting resources") |
| |
| // release alloc-2 |
| partition.removeAllocation(&si.AllocationRelease{ |
| PartitionName: "default", |
| ApplicationID: appID1, |
| AllocationKey: alloc2.GetAllocationKey(), |
| TerminationType: si.TerminationType_STOPPED_BY_RM, |
| Message: "Preempted", |
| }) |
| |
| // currently preempting resources in victim queue should be zero |
| preemptingRes = partition.GetQueue("root.parent.leaf1").GetPreemptingResource() |
| assert.Assert(t, resources.IsZero(preemptingRes), "incorrect preempting resources") |
| |
| // allocation should now allocate |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("missing allocation") |
| } |
| assert.Equal(t, 0, len(app2.GetReservations()), "ask should not be reserved") |
| assert.Equal(t, alloc.GetResult(), objects.AllocatedReserved, "result should be allocated from reservation") |
| assert.Equal(t, alloc.GetAllocationKey(), allocKey3, "expected ask alloc-3 to be allocated") |
| assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}), getExpectedQueuesLimitsForPreemption()) |
| |
| appSummary := app1.GetApplicationSummary("default") |
| assertPreemptedResource(t, appSummary, -1, 5000) |
| |
| appSummary = app2.GetApplicationSummary("default") |
| assertPreemptedResource(t, appSummary, -1, 0) |
| } |
| |
| // Preemption followed by a normal allocation |
| func TestPreemptionForRequiredNodeNormalAlloc(t *testing.T) { |
| setupUGM() |
| // setup the partition so we can try the real allocation |
| partition, app := setupPreemptionForRequiredNode(t) |
| // now try the allocation again: the normal path |
| alloc := partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("allocations should not have returned an allocation") |
| } |
| // check if updated (must be after allocate call) |
| assert.Equal(t, 1, len(app.GetReservations()), "ask should have been reserved") |
| } |
| |
| // Preemption followed by a reserved allocation |
| func TestPreemptionForRequiredNodeReservedAlloc(t *testing.T) { |
| setupUGM() |
| // setup the partition so we can try the real allocation |
| partition, app := setupPreemptionForRequiredNode(t) |
| // now try the allocation again: the reserved path |
| alloc := partition.tryReservedAllocate() |
| if alloc == nil { |
| t.Fatal("allocation attempt should have returned an allocation") |
| } |
| // check if updated (must be after allocate call) |
| assert.Equal(t, 0, len(app.GetReservations()), "ask should have no longer be reserved") |
| assert.Equal(t, alloc.GetResult(), objects.AllocatedReserved, "result is not the expected AllocatedReserved") |
| assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations should have been 0") |
| assert.Equal(t, alloc.GetAllocationKey(), allocKey2, "expected ask alloc-2 to be allocated") |
| assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}), getExpectedQueuesLimitsForPreemption()) |
| } |
| |
| func TestPreemptionForRequiredNodeMultipleAttemptsAvoided(t *testing.T) { |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| |
| app, testHandler := newApplicationWithHandler(appID1, "default", "root.parent.sub-leaf") |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "8"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // add to the partition |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| |
| // normal ask |
| ask := newAllocationAsk(allocKey, appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-1") |
| |
| // first allocation should be app-1 and alloc-1 |
| alloc := partition.tryAllocate() |
| |
| // required node set on ask |
| ask2 := newAllocationAsk(allocKey2, appID1, res) |
| ask2.SetRequiredNode(nodeID1) |
| err = app.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app-1") |
| partition.tryAllocate() |
| |
| // try multiple reserved allocation |
| partition.tryReservedAllocate() |
| partition.tryReservedAllocate() |
| partition.tryReservedAllocate() |
| partition.tryReservedAllocate() |
| |
| var eventCount int |
| for _, event := range testHandler.GetEvents() { |
| if allocRelease, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { |
| if allocRelease.ReleasedAllocations[0].AllocationKey == allocKey { |
| eventCount++ |
| } |
| } |
| } |
| assert.Equal(t, 1, eventCount) |
| assert.Equal(t, true, ask2.HasTriggeredPreemption()) |
| assert.Equal(t, true, alloc.IsPreempted()) |
| } |
| |
| func getExpectedQueuesLimitsForPreemption() map[string]map[string]interface{} { |
| expectedQueuesMaxLimits := make(map[string]map[string]interface{}) |
| expectedQueuesMaxLimits["root"] = make(map[string]interface{}) |
| expectedQueuesMaxLimits["root.parent"] = make(map[string]interface{}) |
| expectedQueuesMaxLimits["root.parent.leaf1"] = make(map[string]interface{}) |
| expectedQueuesMaxLimits["root"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10, "vcores": 10}) |
| expectedQueuesMaxLimits["root.parent"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 5, "vcores": 5}) |
| expectedQueuesMaxLimits["root.parent.leaf1"][maxresources] = expectedQueuesMaxLimits["root.parent"][maxresources] |
| expectedQueuesMaxLimits["root"][maxapplications] = uint64(10) |
| expectedQueuesMaxLimits["root.parent"][maxapplications] = uint64(8) |
| expectedQueuesMaxLimits["root.parent.leaf1"][maxapplications] = uint64(8) |
| return expectedQueuesMaxLimits |
| } |
| |
| func getExpectedQueuesLimitsForPreemptionWithRequiredNode() map[string]map[string]interface{} { |
| expectedQueuesMaxLimits := make(map[string]map[string]interface{}) |
| expectedQueuesMaxLimits["root"] = make(map[string]interface{}) |
| expectedQueuesMaxLimits["root.leaf"] = make(map[string]interface{}) |
| expectedQueuesMaxLimits["root.parent"] = make(map[string]interface{}) |
| expectedQueuesMaxLimits["root.parent.sub-leaf"] = make(map[string]interface{}) |
| expectedQueuesMaxLimits["root"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10, "vcores": 10}) |
| expectedQueuesMaxLimits["root.leaf"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 5, "vcores": 5}) |
| expectedQueuesMaxLimits["root.parent"][maxresources] = expectedQueuesMaxLimits["root.leaf"][maxresources] |
| expectedQueuesMaxLimits["root.parent.sub-leaf"][maxresources] = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 3, "vcores": 3}) |
| expectedQueuesMaxLimits["root"][maxapplications] = uint64(10) |
| expectedQueuesMaxLimits["root.leaf"][maxapplications] = uint64(1) |
| expectedQueuesMaxLimits["root.parent"][maxapplications] = uint64(8) |
| expectedQueuesMaxLimits["root.parent.sub-leaf"][maxapplications] = uint64(2) |
| return expectedQueuesMaxLimits |
| } |
| |
| // setup the partition with existing allocations so we can test preemption |
| func setupPreemption(t *testing.T) (*PartitionContext, *objects.Application, *objects.Application, *objects.Allocation, *objects.Allocation) { |
| partition := createPreemptionQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| if alloc := partition.tryAllocate(); alloc != nil { |
| t.Fatalf("empty cluster allocate returned allocation: %v", alloc.String()) |
| } |
| |
| app1, _ := newApplicationWithHandler(appID1, "default", "root.parent.leaf1") |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "5"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // add to the partition |
| err = partition.AddApplication(app1) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| |
| // ask 1 |
| ask := newAllocationAskPreempt(allocKey, appID1, 2, res) |
| err = app1.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-1") |
| |
| // first allocation should be app-1 and alloc-1 |
| alloc1 := partition.tryAllocate() |
| if alloc1 == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc1.GetResult(), objects.Allocated, "result is not the expected allocated") |
| assert.Equal(t, alloc1.GetReleaseCount(), 0, "released allocations should have been 0") |
| assert.Equal(t, alloc1.GetApplicationID(), appID1, "expected application app-1 to be allocated") |
| assert.Equal(t, alloc1.GetAllocationKey(), allocKey, "expected ask alloc-1 to be allocated") |
| assert.Equal(t, alloc1.GetNodeID(), nodeID1, "expected alloc-1 on node-1") |
| |
| assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 5000}), getExpectedQueuesLimitsForPreemption()) |
| |
| // ask 2 |
| ask2 := newAllocationAskPreempt(allocKey2, appID1, 1, res) |
| err = app1.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app-1") |
| |
| // second allocation should be app-1 and alloc-2 |
| alloc2 := partition.tryAllocate() |
| if alloc2 == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc2.GetResult(), objects.Allocated, "result is not the expected allocated") |
| assert.Equal(t, alloc2.GetReleaseCount(), 0, "released allocations should have been 0") |
| assert.Equal(t, alloc2.GetApplicationID(), appID1, "expected application app-1 to be allocated") |
| assert.Equal(t, alloc2.GetAllocationKey(), allocKey2, "expected ask alloc-2 to be allocated") |
| assert.Equal(t, alloc2.GetNodeID(), nodeID2, "expected alloc-2 on node-2") |
| assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}), getExpectedQueuesLimitsForPreemption()) |
| |
| app2, _ := newApplicationWithHandler(appID2, "default", "root.parent.leaf2") |
| |
| // add to the partition |
| err = partition.AddApplication(app2) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| |
| return partition, app1, app2, alloc1, alloc2 |
| } |
| |
| // setup the partition in a state that we need for multiple tests |
| func setupPreemptionForRequiredNode(t *testing.T) (*PartitionContext, *objects.Application) { |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| if alloc := partition.tryAllocate(); alloc != nil { |
| t.Fatalf("empty cluster allocate returned allocation: %v", alloc.String()) |
| } |
| |
| app, testHandler := newApplicationWithHandler(appID1, "default", "root.parent.sub-leaf") |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "8"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // add to the partition |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| |
| // normal ask |
| ask := newAllocationAsk(allocKey, appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-1") |
| |
| // first allocation should be app-1 and alloc-1 |
| alloc := partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations should have been 0") |
| assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application app-1 to be allocated") |
| assert.Equal(t, alloc.GetAllocationKey(), allocKey, "expected ask alloc-1 to be allocated") |
| assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}), getExpectedQueuesLimitsForPreemptionWithRequiredNode()) |
| allocationKey := alloc.GetAllocationKey() |
| |
| // required node set on ask |
| ask2 := newAllocationAsk(allocKey2, appID1, res) |
| ask2.SetRequiredNode(nodeID1) |
| err = app.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app-1") |
| |
| // since node-1 available resource is less than needed, reserve the node for alloc-2 |
| alloc = partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("allocation attempt should not have returned an allocation") |
| } |
| // check if updated (must be after allocate call) |
| assert.Equal(t, 1, len(app.GetReservations()), "ask should have been reserved") |
| assert.Equal(t, 1, len(app.GetAskReservations(allocKey2)), "ask should have been reserved") |
| assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000}), getExpectedQueuesLimitsForPreemptionWithRequiredNode()) |
| |
| // try through reserved scheduling cycle this should trigger preemption |
| alloc = partition.tryReservedAllocate() |
| if alloc != nil { |
| t.Fatal("reserved allocation attempt should not have returned an allocation") |
| } |
| |
| // check if there is a release event for the expected allocation |
| var found bool |
| for _, event := range testHandler.GetEvents() { |
| if allocRelease, ok := event.(*rmevent.RMReleaseAllocationEvent); ok { |
| found = allocRelease.ReleasedAllocations[0].AllocationKey == allocKey |
| break |
| } |
| } |
| assert.Assert(t, found, "release allocation event not found in list") |
| // release allocation: do what the context would do after the shim processing |
| release := &si.AllocationRelease{ |
| PartitionName: partition.Name, |
| ApplicationID: appID1, |
| AllocationKey: allocationKey, |
| TerminationType: si.TerminationType_PREEMPTED_BY_SCHEDULER, |
| } |
| releases, _ := partition.removeAllocation(release) |
| assert.Equal(t, 0, len(releases), "not expecting any released allocations") |
| assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 0}), getExpectedQueuesLimitsForPreemptionWithRequiredNode()) |
| return partition, app |
| } |
| |
| func TestTryAllocateLarge(t *testing.T) { |
| setupUGM() |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| if alloc := partition.tryAllocate(); alloc != nil { |
| t.Fatalf("empty cluster allocate returned allocation: %v", alloc.String()) |
| } |
| |
| // override the reservation delay, and cleanup when done |
| objects.SetReservationDelay(10 * time.Nanosecond) |
| defer objects.SetReservationDelay(2 * time.Second) |
| |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "100"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| app := newApplication(appID1, "default", "root.parent.sub-leaf") |
| // add to the partition |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| |
| err = app.AddAllocationAsk(newAllocationAsk("alloc-1", appID1, res)) |
| assert.NilError(t, err, "failed to add large ask to app") |
| assert.Assert(t, resources.Equals(res, app.GetPendingResource()), "pending resource not set as expected") |
| alloc := partition.tryAllocate() |
| if alloc != nil { |
| t.Fatalf("allocation did return allocation which does not fit: %s", alloc) |
| } |
| assert.Equal(t, 0, len(app.GetReservations()), "ask should not have been reserved") |
| assertLimits(t, getTestUserGroup(), nil) |
| } |
| |
| func TestAllocReserveNewNode(t *testing.T) { |
| setupUGM() |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| if alloc := partition.tryAllocate(); alloc != nil { |
| t.Fatalf("empty cluster allocate returned allocation: %s", alloc) |
| } |
| |
| // override the reservation delay, and cleanup when done |
| objects.SetReservationDelay(10 * time.Nanosecond) |
| defer objects.SetReservationDelay(2 * time.Second) |
| |
| // turn off the second node |
| node2 := partition.GetNode(nodeID2) |
| node2.SetSchedulable(false) |
| |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "8"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // only one resource for alloc fits on a node |
| app := newApplication(appID1, "default", "root.parent.sub-leaf") |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| |
| ask := newAllocationAsk("alloc-1", appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask to app") |
| ask2 := newAllocationAsk("alloc-2", appID1, res) |
| err = app.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "failed to add ask2 to app") |
| // calculate the resource size for two asks |
| res.MultiplyTo(2) |
| assert.Assert(t, resources.Equals(res, app.GetPendingResource()), "pending resource not set as expected") |
| assert.Assert(t, resources.Equals(res, partition.root.GetPendingResource()), "pending resource not set as expected on root queue") |
| |
| // the first one should be allocated |
| alloc := partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("1st allocation did not return the correct allocation") |
| } |
| assert.Equal(t, objects.Allocated, alloc.GetResult(), "allocation result should have been allocated") |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000})) |
| |
| // the second one should be reserved as the 2nd node is not scheduling |
| alloc = partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("2nd allocation did not return the correct allocation") |
| } |
| // check if updated (must be after allocate call) |
| assert.Equal(t, 1, len(app.GetReservations()), "ask should have been reserved") |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 8000})) |
| |
| // turn on 2nd node |
| node2.SetSchedulable(true) |
| alloc = partition.tryReservedAllocate() |
| assert.Equal(t, objects.AllocatedReserved, alloc.GetResult(), "allocation result should have been allocatedReserved") |
| assert.Equal(t, "", alloc.GetReservedNodeID(), "reserved node should be reset after processing") |
| assert.Equal(t, node2.NodeID, alloc.GetNodeID(), "allocation should be fulfilled on new node") |
| // check if all updated |
| node1 := partition.GetNode(nodeID1) |
| assert.Equal(t, 0, len(node1.GetReservationKeys()), "old node should have no more reservations") |
| assert.Equal(t, 0, len(app.GetReservations()), "ask should have been reserved") |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000})) |
| } |
| |
| func TestTryAllocateReserve(t *testing.T) { |
| setupUGM() |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| if alloc := partition.tryReservedAllocate(); alloc != nil { |
| t.Fatalf("empty cluster reserved allocate returned allocation: %s", alloc) |
| } |
| |
| // create a set of queues and apps: app-1 2 asks; app-2 1 ask (same size) |
| // sub-leaf will have an app with 2 requests and thus more unconfirmed resources compared to leaf2 |
| // this should filter up in the parent and the 1st allocate should show an app-1 allocation |
| // the ask with the higher priority is the second one added alloc-2 for app-1 |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "1"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| app := newApplication(appID1, "default", "root.parent.sub-leaf") |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| |
| err = app.AddAllocationAsk(newAllocationAsk("alloc-1", appID1, res)) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app") |
| |
| ask := newAllocationAsk("alloc-2", appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app") |
| node2 := partition.GetNode(nodeID2) |
| if node2 == nil { |
| t.Fatal("expected node-2 to be returned got nil") |
| } |
| partition.reserve(app, node2, ask) |
| if !app.IsReservedOnNode(node2.NodeID) || len(app.GetAskReservations("alloc-2")) == 0 { |
| t.Fatalf("reservation failure for ask2 and node2") |
| } |
| |
| // first allocation should be app-1 and alloc-2 |
| alloc := partition.tryReservedAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.AllocatedReserved, "result is not the expected allocated from reserved") |
| assert.Equal(t, alloc.GetReservedNodeID(), "", "node should not be set for allocated from reserved") |
| assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations should have been 0") |
| assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application app-1 to be allocated") |
| assert.Equal(t, alloc.GetAllocationKey(), "alloc-2", "expected ask alloc-2 to be allocated") |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 1000})) |
| |
| // reservations should have been removed: it is in progress |
| if app.IsReservedOnNode(node2.NodeID) || len(app.GetAskReservations("alloc-2")) != 0 { |
| t.Fatalf("reservation removal failure for ask2 and node2") |
| } |
| |
| // no reservations left this should return nil |
| alloc = partition.tryReservedAllocate() |
| if alloc != nil { |
| t.Fatalf("reserved allocation should not return any allocation: %s, '%s'", alloc, alloc.GetReservedNodeID()) |
| } |
| // try non reserved this should allocate |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations should have been 0") |
| assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application app-1 to be allocated") |
| assert.Equal(t, alloc.GetAllocationKey(), "alloc-1", "expected ask alloc-1 to be allocated") |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 2000})) |
| |
| if !resources.IsZero(partition.root.GetPendingResource()) { |
| t.Fatalf("pending allocations should be set to zero") |
| } |
| } |
| |
| func TestTryAllocateWithReserved(t *testing.T) { |
| setupUGM() |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| if alloc := partition.tryReservedAllocate(); alloc != nil { |
| t.Fatalf("empty cluster reserved allocate returned allocation: %v", alloc) |
| } |
| |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "5"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| app := newApplication(appID1, "default", "root.parent.sub-leaf") |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| |
| ask := newAllocationAsk("alloc-1", appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app") |
| |
| ask2 := newAllocationAsk("alloc-2", appID1, res) |
| err = app.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app") |
| |
| // reserve one node: scheduling should happen on the other |
| node2 := partition.GetNode(nodeID2) |
| if node2 == nil { |
| t.Fatal("expected node-2 to be returned got nil") |
| } |
| partition.reserve(app, node2, ask) |
| if !app.IsReservedOnNode(node2.NodeID) || len(app.GetAskReservations("alloc-1")) == 0 { |
| t.Fatal("reservation failure for ask and node2") |
| } |
| alloc := partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return correct an allocation") |
| } |
| assert.Equal(t, objects.AllocatedReserved, alloc.GetResult(), "expected reserved allocation to be returned") |
| assert.Equal(t, "", alloc.GetReservedNodeID(), "reserved node should be reset after processing") |
| assert.Equal(t, 0, len(node2.GetReservationKeys()), "reservation should have been removed from node") |
| assert.Equal(t, false, app.IsReservedOnNode(node2.NodeID), "reservation cleanup for ask on app failed") |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 5000})) |
| |
| // node2 is unreserved now so the next one should allocate on the 2nd node (fair sharing) |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return correct allocation") |
| } |
| assert.Equal(t, objects.Allocated, alloc.GetResult(), "expected allocated allocation to be returned") |
| assert.Equal(t, node2.NodeID, alloc.GetNodeID(), "expected allocation on node2 to be returned") |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000})) |
| } |
| |
| // remove the reserved ask while allocating in flight for the ask |
| func TestScheduleRemoveReservedAsk(t *testing.T) { |
| setupUGM() |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| if alloc := partition.tryAllocate(); alloc != nil { |
| t.Fatalf("empty cluster allocate returned allocation: %s", alloc) |
| } |
| |
| // override the reservation delay, and cleanup when done |
| objects.SetReservationDelay(10 * time.Nanosecond) |
| defer objects.SetReservationDelay(2 * time.Second) |
| |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "4"}) |
| assert.NilError(t, err, "resource creation failed") |
| app := newApplication(appID1, "default", "root.parent.sub-leaf") |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app app-1 to partition") |
| for i := 1; i <= 4; i++ { |
| ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, fmt.Sprintf("failed to add ask alloc-%d to app", i)) |
| } |
| |
| // calculate the resource size using the repeat request |
| pending := resources.Multiply(res, 4) |
| assert.Assert(t, resources.Equals(pending, app.GetPendingResource()), "pending resource not set as expected") |
| |
| // allocate the ask |
| for i := 1; i <= 4; i++ { |
| alloc := partition.tryAllocate() |
| if alloc == nil || alloc.GetResult() != objects.Allocated { |
| t.Fatalf("expected allocated allocation to be returned (step %d) %s", i, alloc) |
| } |
| } |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000})) |
| |
| // add a asks which should reserve |
| ask := newAllocationAsk("alloc-5", appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-5 to app") |
| ask = newAllocationAsk("alloc-6", appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-6 to app") |
| pending = resources.Multiply(res, 2) |
| assert.Assert(t, resources.Equals(pending, app.GetPendingResource()), "pending resource not set as expected") |
| // allocate so we get reservations |
| for i := 1; i <= 2; i++ { |
| alloc := partition.tryAllocate() |
| if alloc != nil { |
| t.Fatalf("expected reservations to be created not allocation to be returned (step %d) %s", i, alloc) |
| } |
| assert.Equal(t, len(app.GetReservations()), i, "application reservations incorrect") |
| } |
| assert.Equal(t, len(app.GetReservations()), 2, "application reservations should be 2") |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000})) |
| |
| // add a node |
| node := newNodeMaxResource("node-3", res) |
| err = partition.AddNode(node, nil) |
| assert.NilError(t, err, "failed to add node node-3 to the partition") |
| // Try to allocate one of the reservation. We go directly to the root queue not using the partition otherwise |
| // we confirm before we get back in the test code and cannot remove the ask |
| alloc := partition.root.TryReservedAllocate(partition.GetNodeIterator) |
| if alloc == nil || alloc.GetResult() != objects.AllocatedReserved { |
| t.Fatalf("expected allocatedReserved allocation to be returned %v", alloc) |
| } |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 20000})) |
| |
| // before confirming remove the ask: do what the scheduler does when it gets a request from a |
| // shim in processAllocationReleaseByAllocationKey() |
| // make sure we are counting correctly and leave the other reservation intact |
| removeAskID := "alloc-5" |
| if alloc.GetAllocationKey() == "alloc-6" { |
| removeAskID = "alloc-6" |
| } |
| released := app.RemoveAllocationAsk(removeAskID) |
| assert.Equal(t, released, 1, "expected one reservations to be released") |
| assert.Equal(t, len(app.GetReservations()), 1, "application reservations should be 1") |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 20000})) |
| |
| // now confirm the allocation: this should not remove the reservation |
| rmAlloc := partition.allocate(alloc) |
| assert.Equal(t, "", rmAlloc.GetReservedNodeID(), "reserved node should be reset after processing") |
| assert.Equal(t, len(app.GetReservations()), 1, "application reservations should be kept at 1") |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 20000})) |
| } |
| |
| // update the config with nodes registered and make sure that the root max and guaranteed are not changed |
| func TestUpdateRootQueue(t *testing.T) { |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "20"}) |
| assert.NilError(t, err, "resource creation failed") |
| assert.Assert(t, resources.Equals(res, partition.totalPartitionResource), "partition resource not set as expected") |
| assert.Assert(t, resources.Equals(res, partition.root.GetMaxResource()), "root max resource not set as expected") |
| |
| conf := configs.PartitionConfig{ |
| Name: "test", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "root", |
| Parent: true, |
| SubmitACL: "*", |
| Queues: nil, |
| Limits: []configs.Limit{ |
| { |
| Limit: "root queue limit", |
| Users: []string{ |
| "testuser", |
| }, |
| Groups: []string{ |
| "testgroup", |
| }, |
| MaxResources: map[string]string{ |
| "memory": "10", |
| "vcores": "10", |
| }, |
| MaxApplications: 2, |
| }, |
| }, |
| }, |
| }, |
| PlacementRules: nil, |
| Limits: nil, |
| NodeSortPolicy: configs.NodeSortingPolicy{}, |
| } |
| |
| err = partition.updatePartitionDetails(conf) |
| assert.NilError(t, err, "partition update failed") |
| // resources should not have changed |
| assert.Assert(t, resources.Equals(res, partition.totalPartitionResource), "partition resource not set as expected") |
| assert.Assert(t, resources.Equals(res, partition.root.GetMaxResource()), "root max resource not set as expected") |
| // make sure the update went through |
| assert.Equal(t, partition.GetQueue("root.leaf").CurrentState(), objects.Draining.String(), "leaf queue should have been marked for removal") |
| assert.Equal(t, partition.GetQueue("root.parent").CurrentState(), objects.Draining.String(), "parent queue should have been marked for removal") |
| |
| // add new node, node 3 with 'memory' resource type |
| res1, err1 := resources.NewResourceFromConf(map[string]string{"vcore": "20", "memory": "50"}) |
| assert.NilError(t, err1, "resource creation failed") |
| err = partition.AddNode(newNodeMaxResource("node-3", res1), nil) |
| assert.NilError(t, err, "test node3 add failed unexpected") |
| |
| // root max resource gets updated with 'memory' resource type |
| expRes, err1 := resources.NewResourceFromConf(map[string]string{"vcore": "40", "memory": "50"}) |
| assert.NilError(t, err1, "resource creation failed") |
| assert.Assert(t, resources.Equals(expRes, partition.root.GetMaxResource()), "root max resource not set as expected") |
| |
| // remove node, node 3. root max resource won't have 'memory' resource type and updated with less 'vcore' |
| partition.removeNode("node-3") |
| assert.Assert(t, resources.Equals(res, partition.root.GetMaxResource()), "root max resource not set as expected") |
| |
| // remove node, node 2. root max resource gets updated with less 'vcores' |
| partition.removeNode("node-2") |
| expRes1, err1 := resources.NewResourceFromConf(map[string]string{"vcore": "10"}) |
| assert.NilError(t, err1, "resource creation failed") |
| assert.Assert(t, resources.Equals(expRes1, partition.root.GetMaxResource()), "root max resource not set as expected") |
| |
| // remove node, node 1. root max resource should set to nil |
| partition.removeNode("node-1") |
| assert.Assert(t, resources.Equals(nil, partition.root.GetMaxResource()), "root max resource not set as expected") |
| } |
| |
| // transition an application to completed state and wait for it to be processed into the completedApplications map |
| func completeApplicationAndWait(app *objects.Application, pc *PartitionContext) error { |
| currentCount := len(pc.GetCompletedApplications()) |
| err := app.HandleApplicationEvent(objects.CompleteApplication) |
| if err != nil { |
| return err |
| } |
| |
| err = common.WaitFor(10*time.Millisecond, time.Duration(1000)*time.Millisecond, func() bool { |
| newCount := len(pc.GetCompletedApplications()) |
| return newCount == currentCount+1 |
| }) |
| |
| return err |
| } |
| |
| func TestCompleteApp(t *testing.T) { |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| app := newApplication("completed", "default", defQueue) |
| app.SetState(objects.Completing.String()) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "no error expected while adding the application") |
| assert.Equal(t, 1, len(partition.GetApplications()), "the partition should have 1 app") |
| assert.Equal(t, 0, len(partition.GetCompletedApplications()), "the partition should not have any completed apps") |
| // complete the application |
| err = completeApplicationAndWait(app, partition) |
| assert.NilError(t, err, "the completed application should have been processed") |
| assert.Equal(t, 0, len(partition.GetApplications()), "the partition should have no active app") |
| assert.Equal(t, 1, len(partition.GetCompletedApplications()), "the partition should have 1 completed app") |
| } |
| |
| func TestCleanupFailedApps(t *testing.T) { |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| newApp1 := newApplication("newApp1", "default", defQueue) |
| newApp2 := newApplication("newApp2", "default", defQueue) |
| |
| err = partition.AddApplication(newApp1) |
| assert.NilError(t, err, "no error expected while adding the app") |
| err = partition.AddApplication(newApp2) |
| assert.NilError(t, err, "no error expected while adding the app") |
| |
| assert.Equal(t, 2, len(partition.GetApplications()), "the partition should have 2 apps") |
| newApp1.SetState(objects.Expired.String()) |
| assert.Equal(t, 1, len(partition.getAppsByState(objects.Expired.String())), "the partition should have 1 expired apps") |
| |
| partition.cleanupExpiredApps() |
| assert.Equal(t, 1, len(partition.GetApplications()), "the partition should have 1 app") |
| assert.Equal(t, 0, len(partition.getAppsByState(objects.Expired.String())), "the partition should have 0 expired apps") |
| } |
| |
| func TestCleanupCompletedApps(t *testing.T) { |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| completedApp1 := newApplication("completedApp1", "default", defQueue) |
| completedApp2 := newApplication("completedApp2", "default", defQueue) |
| |
| err = partition.AddApplication(completedApp1) |
| assert.NilError(t, err, "no error expected while adding the app") |
| err = partition.AddApplication(completedApp2) |
| assert.NilError(t, err, "no error expected while adding the app") |
| |
| assert.Equal(t, 2, len(partition.GetApplications()), "the partition should have 2 apps") |
| assert.Equal(t, 0, len(partition.GetCompletedApplications()), "the partition should have 0 completed apps") |
| |
| // complete the applications using the event system |
| completedApp1.SetState(objects.Completing.String()) |
| err = completeApplicationAndWait(completedApp1, partition) |
| assert.NilError(t, err, "the completed application should have been processed") |
| completedApp2.SetState(objects.Completing.String()) |
| err = completeApplicationAndWait(completedApp2, partition) |
| assert.NilError(t, err, "the completed application should have been processed") |
| |
| assert.Equal(t, 0, len(partition.GetApplications()), "the partition should have 0 apps") |
| assert.Equal(t, 2, len(partition.GetCompletedApplications()), "the partition should have 2 completed apps") |
| assert.Equal(t, 2, len(partition.getCompletedAppsByState(objects.Completed.String())), "the partition should have 2 completed apps") |
| |
| // mark the app for removal |
| completedApp1.SetState(objects.Expired.String()) |
| assert.Equal(t, 1, len(partition.getCompletedAppsByState(objects.Expired.String())), "the partition should have 1 expired apps") |
| assert.Equal(t, 1, len(partition.getCompletedAppsByState(objects.Completed.String())), "the partition should have 1 completed apps") |
| |
| partition.cleanupExpiredApps() |
| assert.Equal(t, 1, len(partition.GetCompletedApplications()), "the partition should have 1 completed app") |
| assert.Equal(t, 0, len(partition.getCompletedAppsByState(objects.Expired.String())), "the partition should have 0 expired apps") |
| } |
| |
| func TestCleanupRejectedApps(t *testing.T) { |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| rejectedApp := newApplication("new", "default", defQueue) |
| rejectedMessage := fmt.Sprintf("Failed to place application %s: application rejected: no placement rule matched", rejectedApp.ApplicationID) |
| |
| partition.AddRejectedApplication(rejectedApp, rejectedMessage) |
| cloneRejectedApp := partition.getRejectedApplication(rejectedApp.ApplicationID) |
| assert.Equal(t, rejectedApp, cloneRejectedApp) |
| assert.Equal(t, partition.rejectedApplications[rejectedApp.ApplicationID], cloneRejectedApp) |
| assert.Equal(t, cloneRejectedApp.GetRejectedMessage(), rejectedMessage) |
| assert.Equal(t, cloneRejectedApp.CurrentState(), objects.Rejected.String()) |
| assert.Assert(t, !cloneRejectedApp.FinishedTime().IsZero()) |
| |
| assert.Equal(t, 1, len(partition.rejectedApplications), "the rejectedApplications of the partition should have 1 app") |
| assert.Equal(t, 1, len(partition.getRejectedAppsByState(objects.Rejected.String())), "the partition should have 1 rejected app") |
| rejectedApp.SetState(objects.Expired.String()) |
| assert.Equal(t, 0, len(partition.getRejectedAppsByState(objects.Rejected.String())), "the partition should have 0 rejected app") |
| |
| partition.cleanupExpiredApps() |
| assert.Equal(t, 0, len(partition.rejectedApplications), "the partition should not have app") |
| assert.Assert(t, partition.getRejectedApplication(rejectedApp.ApplicationID) == nil, "rejected application should have been deleted") |
| assert.Equal(t, 0, len(partition.getRejectedAppsByState(objects.Expired.String())), "the partition should have 0 expired app") |
| } |
| |
| func TestUpdateNode(t *testing.T) { |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "test partition create failed with error") |
| |
| newRes, err := resources.NewResourceFromConf(map[string]string{"memory": "400M", "vcore": "30"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| err = partition.AddNode(newNodeMaxResource("test", newRes), nil) |
| assert.NilError(t, err, "test node add failed unexpected") |
| assert.Equal(t, 1, partition.nodes.GetNodeCount(), "node list not correct") |
| |
| if !resources.Equals(newRes, partition.GetTotalPartitionResource()) { |
| t.Errorf("Expected partition resource %s, doesn't match with actual partition resource %s", newRes, partition.GetTotalPartitionResource()) |
| } |
| |
| // delta resource for a node with mem as 450 and vcores as 40 (both mem and vcores has increased) |
| delta, err := resources.NewResourceFromConf(map[string]string{"memory": "50M", "vcore": "10"}) |
| assert.NilError(t, err, "failed to create resource") |
| partition.updatePartitionResource(delta) |
| |
| expectedRes, err := resources.NewResourceFromConf(map[string]string{"memory": "450M", "vcore": "40"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| if !resources.Equals(expectedRes, partition.GetTotalPartitionResource()) { |
| t.Errorf("Expected partition resource %s, doesn't match with actual partition resource %s", expectedRes, partition.GetTotalPartitionResource()) |
| } |
| |
| // delta resource for a node with mem as 400 and vcores as 30 (both mem and vcores has decreased) |
| delta = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": -50000000, "vcore": -10000}) |
| partition.updatePartitionResource(delta) |
| |
| expectedRes, err = resources.NewResourceFromConf(map[string]string{"memory": "400M", "vcore": "30"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| if !resources.Equals(expectedRes, partition.GetTotalPartitionResource()) { |
| t.Errorf("Expected partition resource %s, doesn't match with actual partition resource %s", expectedRes, partition.GetTotalPartitionResource()) |
| } |
| |
| // delta resource for a node with mem as 450 and vcores as 10 (mem has increased but vcores has decreased) |
| delta = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 50000000, "vcore": -20000}) |
| partition.updatePartitionResource(delta) |
| |
| expectedRes, err = resources.NewResourceFromConf(map[string]string{"memory": "450M", "vcore": "10"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| if !resources.Equals(expectedRes, partition.GetTotalPartitionResource()) { |
| t.Errorf("Expected partition resource %s, doesn't match with actual partition resource %s", expectedRes, partition.GetTotalPartitionResource()) |
| } |
| } |
| |
| func TestAddTGApplication(t *testing.T) { |
| limit := map[string]string{"vcore": "1"} |
| partition, err := newLimitedPartition(limit) |
| assert.NilError(t, err, "partition create failed") |
| // add a app with TG that does not fit in the queue |
| var tgRes *resources.Resource |
| tgRes, err = resources.NewResourceFromConf(map[string]string{"vcore": "10"}) |
| assert.NilError(t, err, "failed to create resource") |
| tags := map[string]string{ |
| siCommon.AppTagNamespaceResourceGuaranteed: "{\"resources\":{\"vcore\":{\"value\":111}}}", |
| siCommon.AppTagNamespaceResourceQuota: "{\"resources\":{\"vcore\":{\"value\":2222}}}", |
| } |
| app := newApplicationTGTags(appID1, "default", "root.limited", tgRes, tags) |
| err = partition.AddApplication(app) |
| if err == nil { |
| t.Error("app-1 should be rejected due to TG request") |
| } |
| queue := partition.GetQueue("root.limited") |
| assert.Assert(t, resources.Equals(queue.GetMaxResource(), resources.NewResourceFromMap(map[string]resources.Quantity{ |
| "vcore": 1000, |
| })), "max resource changed unexpectedly") |
| assert.Assert(t, queue.GetGuaranteedResource() == nil) |
| |
| // add a app with TG that does fit in the queue |
| limit = map[string]string{"vcore": "100"} |
| partition, err = newLimitedPartition(limit) |
| assert.NilError(t, err, "partition create failed") |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app-1 should have been added to the partition") |
| assert.Equal(t, partition.getApplication(appID1), app, "partition failed to add app incorrect app returned") |
| queue = partition.GetQueue("root.limited") |
| assert.Assert(t, resources.Equals(queue.GetMaxResource(), resources.NewResourceFromMap(map[string]resources.Quantity{ |
| "vcore": 100000, |
| })), "max resource changed unexpectedly") |
| assert.Assert(t, queue.GetGuaranteedResource() == nil) |
| |
| // add a app with TG that does fit in the queue as the resource is not limited in the queue |
| limit = map[string]string{"second": "100"} |
| partition, err = newLimitedPartition(limit) |
| assert.NilError(t, err, "partition create failed") |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app-1 should have been added to the partition") |
| assert.Equal(t, partition.getApplication(appID1), app, "partition failed to add app incorrect app returned") |
| queue = partition.GetQueue("root.limited") |
| assert.Assert(t, resources.Equals(queue.GetMaxResource(), resources.NewResourceFromMap(map[string]resources.Quantity{ |
| "second": 100, |
| })), "max resource changed unexpectedly") |
| assert.Assert(t, queue.GetGuaranteedResource() == nil) |
| } |
| |
| func TestAddTGAppDynamic(t *testing.T) { |
| partition, err := newPlacementPartition() |
| assert.NilError(t, err, "partition create failed") |
| // add a app with TG that does fit in the dynamic queue (no limit) |
| var tgRes *resources.Resource |
| tgRes, err = resources.NewResourceFromConf(map[string]string{"vcore": "10"}) |
| assert.NilError(t, err, "failed to create resource") |
| tags := map[string]string{"taskqueue": "unlimited"} |
| app := newApplicationTGTags(appID1, "default", "unknown", tgRes, tags) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app-1 should have been added to the partition") |
| assert.Equal(t, app.GetQueuePath(), "root.unlimited", "app-1 not placed in expected queue") |
| |
| jsonMaxRes := "{\"resources\":{\"vcore\":{\"value\":10000}}}" |
| tags = map[string]string{"taskqueue": "same", siCommon.AppTagNamespaceResourceQuota: jsonMaxRes} |
| app = newApplicationTGTags(appID2, "default", "unknown", tgRes, tags) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app-2 should have been added to the partition") |
| assert.Equal(t, partition.getApplication(appID2), app, "partition failed to add app incorrect app returned") |
| assert.Equal(t, app.GetQueuePath(), "root.same", "app-2 not placed in expected queue") |
| queue := partition.GetQueue("root.same") |
| assert.Assert(t, queue.GetGuaranteedResource() == nil, "guaranteed resource should be unset") |
| maxRes := queue.GetMaxResource() |
| assert.Assert(t, maxRes != nil, "maximum resource should have been set") |
| assert.Assert(t, resources.Equals(maxRes, resources.NewResourceFromMap(map[string]resources.Quantity{ |
| "vcore": 10000, |
| })), "max resource set on the queue does not match the JSON tag") |
| |
| jsonMaxRes = "{\"resources\":{\"vcore\":{\"value\":1000}}}" |
| jsonGuaranteedRes := "{\"resources\":{\"vcore\":{\"value\":111}}}" |
| tags = map[string]string{"taskqueue": "smaller", siCommon.AppTagNamespaceResourceQuota: jsonMaxRes, siCommon.AppTagNamespaceResourceGuaranteed: jsonGuaranteedRes} |
| app = newApplicationTGTags(appID3, "default", "unknown", tgRes, tags) |
| err = partition.AddApplication(app) |
| if err == nil { |
| t.Error("app-3 should not have been added to the partition: TG & dynamic limit") |
| } |
| if partition.getApplication(appID3) != nil { |
| t.Fatal("partition added app incorrectly should have failed") |
| } |
| queue = partition.GetQueue("root.smaller") |
| if queue == nil { |
| t.Fatal("queue should have been added, even if app failed") |
| } |
| maxRes = queue.GetMaxResource() |
| assert.Assert(t, maxRes != nil, "maximum resource should have been set") |
| assert.Assert(t, resources.Equals(maxRes, resources.NewResourceFromMap(map[string]resources.Quantity{ |
| "vcore": 1000, |
| })), "max resource set on the queue does not match the JSON tag") |
| guaranteedRes := queue.GetGuaranteedResource() |
| assert.Assert(t, guaranteedRes != nil, "guaranteed resource should have been set") |
| assert.Assert(t, resources.Equals(guaranteedRes, resources.NewResourceFromMap(map[string]resources.Quantity{ |
| "vcore": 111, |
| })), "guaranteed resource set on the queue does not match the JSON tag") |
| } |
| |
| func TestPlaceholderSmallerThanReal(t *testing.T) { |
| setupUGM() |
| |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| tgRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5, "second": 5}) |
| phRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2, "second": 2}) |
| |
| // add node to allow allocation |
| node := setupNode(t, nodeID1, partition, tgRes) |
| |
| // add the app with placeholder request |
| app := newApplicationTG(appID1, "default", "root.default", tgRes) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app-1 should have been added to the partition") |
| // add an ask for a placeholder and allocate |
| ask := newAllocationAskTG(phID, appID1, taskGroup, phRes, true) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add placeholder ask ph-1 to app") |
| // try to allocate a placeholder via normal allocate |
| ph := partition.tryAllocate() |
| if ph == nil { |
| t.Fatal("expected placeholder ph-1 to be allocated") |
| } |
| assert.Equal(t, phID, ph.GetAllocationKey(), "expected allocation of ph-1 to be returned") |
| assert.Assert(t, resources.Equals(phRes, app.GetQueue().GetAllocatedResource()), "placeholder size should be allocated on queue") |
| assert.Assert(t, resources.Equals(phRes, node.GetAllocatedResource()), "placeholder size should be allocated on node") |
| assert.Equal(t, 1, partition.GetTotalAllocationCount(), "placeholder allocation should be registered on the partition") |
| assertLimits(t, getTestUserGroup(), phRes) |
| |
| // add an ask which is larger than the placeholder |
| ask = newAllocationAskTG(allocKey, appID1, taskGroup, tgRes, false) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app with correct TG") |
| // allocate should trigger release of placeholder nothing else |
| if partition.tryPlaceholderAllocate() != nil { |
| t.Fatal("allocation should not have matched placeholder") |
| } |
| assert.Assert(t, ph.IsReleased(), "placeholder should be released") |
| assertLimits(t, getTestUserGroup(), phRes) |
| |
| // release placeholder: do what the context would do after the shim processing |
| release := &si.AllocationRelease{ |
| PartitionName: ph.GetPartitionName(), |
| ApplicationID: appID1, |
| AllocationKey: ph.GetAllocationKey(), |
| TerminationType: si.TerminationType_TIMEOUT, |
| } |
| assert.Equal(t, 1, partition.getPhAllocationCount(), "ph should be registered") |
| |
| released, _ := partition.removeAllocation(release) |
| assert.Equal(t, 0, partition.getPhAllocationCount(), "ph should not be registered") |
| |
| assert.Equal(t, 0, len(released), "expected no releases") |
| assert.Assert(t, resources.IsZero(node.GetAllocatedResource()), "nothing should be allocated on node") |
| assert.Assert(t, resources.IsZero(app.GetQueue().GetAllocatedResource()), "nothing should be allocated on queue") |
| assert.Equal(t, 0, partition.GetTotalAllocationCount(), "no allocation should be registered on the partition") |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0, "second": 0})) |
| } |
| |
| // one real allocation should trigger cleanup of all placeholders |
| func TestPlaceholderSmallerMulti(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| phCount := 5 |
| phRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2, "second": 2}) |
| tgRes := resources.Multiply(phRes, int64(phCount)) |
| |
| // add node to allow allocation |
| node := setupNode(t, nodeID1, partition, tgRes) |
| |
| // add the app with placeholder request |
| app := newApplicationTG(appID1, "default", "root.default", tgRes) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app-1 should have been added to the partition") |
| phs := make(map[string]*objects.Allocation, 5) |
| for i := 0; i < phCount; i++ { |
| // add an ask for a placeholder and allocate |
| id := "ph-" + strconv.Itoa(i) |
| ask := newAllocationAskTG(id, appID1, taskGroup, phRes, true) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add placeholder ask %s to app", id) |
| // try to allocate a placeholder via normal allocate |
| ph := partition.tryAllocate() |
| if ph == nil { |
| t.Fatalf("expected placeholder %s to be allocated", id) |
| } |
| assert.Equal(t, id, ph.GetAllocationKey(), "expected allocation of %s to be returned", id) |
| phs[id] = ph |
| } |
| assert.Assert(t, resources.Equals(tgRes, app.GetQueue().GetAllocatedResource()), "all placeholders should be allocated on queue") |
| assert.Assert(t, resources.Equals(tgRes, node.GetAllocatedResource()), "all placeholders should be allocated on node") |
| assert.Equal(t, phCount, partition.GetTotalAllocationCount(), "placeholder allocation should be counted as normal allocations on the partition") |
| assert.Equal(t, phCount, partition.getPhAllocationCount(), "placeholder allocation should be counted as placeholders on the partition") |
| assertLimits(t, getTestUserGroup(), tgRes) |
| |
| // add an ask which is larger than the placeholder |
| ask := newAllocationAskTG(allocKey, appID1, taskGroup, tgRes, false) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app with correct TG") |
| // allocate should trigger release of placeholder nothing else |
| if partition.tryPlaceholderAllocate() != nil { |
| t.Fatal("allocation should not have matched placeholder") |
| } |
| for id, ph := range phs { |
| assert.Assert(t, ph.IsReleased(), "placeholder %s should be released", id) |
| } |
| assertLimits(t, getTestUserGroup(), tgRes) |
| |
| // release placeholders: do what the context would do after the shim processing |
| for id, ph := range phs { |
| assert.Assert(t, ph.IsReleased(), "placeholder %s should be released", id) |
| release := &si.AllocationRelease{ |
| PartitionName: ph.GetPartitionName(), |
| ApplicationID: appID1, |
| AllocationKey: ph.GetAllocationKey(), |
| TerminationType: si.TerminationType_TIMEOUT, |
| } |
| released, _ := partition.removeAllocation(release) |
| assert.Equal(t, 0, len(released), "expected no releases") |
| } |
| assert.Assert(t, resources.IsZero(node.GetAllocatedResource()), "nothing should be allocated on node") |
| assert.Assert(t, resources.IsZero(app.GetQueue().GetAllocatedResource()), "nothing should be allocated on queue") |
| assert.Equal(t, 0, partition.GetTotalAllocationCount(), "no allocation should be registered on the partition") |
| assert.Equal(t, 0, partition.getPhAllocationCount(), "no placeholder allocation should be on the partition") |
| assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0, "second": 0})) |
| } |
| |
| func TestPlaceholderBiggerThanReal(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| if alloc := partition.tryPlaceholderAllocate(); alloc != nil { |
| t.Fatalf("empty cluster placeholder allocate returned allocation: %s", alloc) |
| } |
| |
| tgRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5, "second": 5}) |
| phRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2, "second": 2}) |
| smallRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1, "second": 1}) |
| |
| // add node to allow allocation |
| node := setupNode(t, nodeID1, partition, tgRes) |
| |
| // add the app with placeholder request |
| app := newApplicationTG(appID1, "default", "root.default", tgRes) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app-1 should have been added to the partition") |
| // add an ask for a placeholder and allocate |
| ask := newAllocationAskTG(phID, appID1, taskGroup, phRes, true) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add placeholder ask ph-1 to app") |
| // try to allocate a placeholder via normal allocate |
| ph := partition.tryAllocate() |
| if ph == nil { |
| t.Fatal("expected placeholder ph-1 to be allocated") |
| } |
| assert.Equal(t, phID, ph.GetAllocationKey(), "expected allocation of ph-1 to be returned") |
| assert.Assert(t, resources.Equals(phRes, app.GetQueue().GetAllocatedResource()), "placeholder size should be allocated on queue") |
| assert.Assert(t, resources.Equals(phRes, node.GetAllocatedResource()), "placeholder size should be allocated on node") |
| assert.Equal(t, 1, partition.GetTotalAllocationCount(), "placeholder allocation should be registered on the partition") |
| assert.Equal(t, 1, partition.getPhAllocationCount(), "placeholder allocation should be registered") |
| assertLimits(t, getTestUserGroup(), phRes) |
| |
| // add a new ask with smaller request and allocate |
| ask = newAllocationAskTG(allocKey, appID1, taskGroup, smallRes, false) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app with correct TG") |
| alloc := partition.tryPlaceholderAllocate() |
| if alloc == nil { |
| t.Fatal("allocation should have matched placeholder") |
| } |
| assert.Equal(t, objects.Replaced, alloc.GetResult(), "expected replacement result to be returned") |
| assert.Equal(t, 1, alloc.GetReleaseCount(), "placeholder should have been linked") |
| // no updates yet on queue and node |
| assert.Assert(t, resources.Equals(phRes, app.GetQueue().GetAllocatedResource()), "placeholder size should still be allocated on queue") |
| assert.Assert(t, resources.Equals(phRes, node.GetAllocatedResource()), "placeholder size should still be allocated on node") |
| assertLimits(t, getTestUserGroup(), phRes) |
| |
| // replace the placeholder: do what the context would do after the shim processing |
| release := &si.AllocationRelease{ |
| PartitionName: ph.GetPartitionName(), |
| ApplicationID: appID1, |
| AllocationKey: ph.GetAllocationKey(), |
| TerminationType: si.TerminationType_PLACEHOLDER_REPLACED, |
| } |
| released, confirmed := partition.removeAllocation(release) |
| assert.Equal(t, 0, len(released), "no allocation should be released") |
| if confirmed == nil { |
| t.Fatal("one allocation should be confirmed") |
| } |
| assert.Equal(t, 1, partition.GetTotalAllocationCount(), "real allocation should be registered on the partition") |
| assert.Equal(t, 0, partition.getPhAllocationCount(), "no placeholder allocation should be registered") |
| assert.Assert(t, resources.Equals(smallRes, app.GetQueue().GetAllocatedResource()), "real size should be allocated on queue") |
| assert.Assert(t, resources.Equals(smallRes, node.GetAllocatedResource()), "real size should be allocated on node") |
| assert.Assert(t, !app.IsCompleting(), "application with allocation should not be in COMPLETING state") |
| assertLimits(t, getTestUserGroup(), smallRes) |
| } |
| |
| func TestPlaceholderMatch(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| tgRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 10}) |
| phRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2, "second": 2}) |
| |
| // add node to allow allocation |
| setupNode(t, nodeID1, partition, tgRes) |
| |
| // add the app with placeholder request |
| app := newApplicationTG(appID1, "default", "root.default", tgRes) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app-1 should have been added to the partition") |
| // add an ask for a placeholder and allocate |
| ask := newAllocationAskTG(phID, appID1, taskGroup, phRes, true) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add placeholder ask ph-1 to app") |
| // try to allocate a placeholder via normal allocate |
| ph := partition.tryAllocate() |
| if ph == nil { |
| t.Fatal("expected placeholder ph-1 to be allocated") |
| } |
| phAllocationKey := ph.GetAllocationKey() |
| assert.Equal(t, phID, ph.GetAllocationKey(), "expected allocation of ph-1 to be returned") |
| assert.Equal(t, 1, len(app.GetAllPlaceholderData()), "placeholder data should be created on allocate") |
| assert.Equal(t, 1, partition.GetTotalAllocationCount(), "placeholder allocation should be registered as allocation") |
| assert.Equal(t, 1, partition.getPhAllocationCount(), "placeholder allocation should be registered") |
| assertLimits(t, getTestUserGroup(), phRes) |
| |
| // add a new ask with an unknown task group (should allocate directly) |
| ask = newAllocationAskTG(allocKey, appID1, "unknown", phRes, false) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app") |
| alloc := partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("expected ask to be allocated (unmatched task group)") |
| } |
| assert.Equal(t, 2, partition.GetTotalAllocationCount(), "allocations should be registered: ph + normal") |
| assert.Equal(t, 1, partition.getPhAllocationCount(), "placeholder allocation should be registered") |
| assert.Equal(t, allocKey, alloc.GetAllocationKey(), "expected allocation of alloc-1 to be returned") |
| assert.Equal(t, 1, len(app.GetAllPlaceholderData()), "placeholder data should not be updated") |
| assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].Count, "placeholder data should show 1 available placeholder") |
| assert.Equal(t, int64(0), app.GetAllPlaceholderData()[0].Replaced, "placeholder data should show no replacements") |
| assertLimits(t, getTestUserGroup(), resources.Multiply(phRes, 2)) |
| |
| // add a new ask the same task group as the placeholder |
| ask = newAllocationAskTG(allocKey2, appID1, taskGroup, phRes, false) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app") |
| alloc = partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("expected ask not to be allocated (matched task group)") |
| } |
| assert.Equal(t, 1, len(app.GetAllPlaceholderData()), "placeholder data should not be updated") |
| assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].Count, "placeholder data should show 1 available placeholder") |
| assert.Equal(t, int64(0), app.GetAllPlaceholderData()[0].Replaced, "placeholder data should show no replacements") |
| assertLimits(t, getTestUserGroup(), resources.Multiply(phRes, 2)) |
| |
| // replace the placeholder should work |
| alloc = partition.tryPlaceholderAllocate() |
| if alloc == nil { |
| t.Fatal("allocation should have matched placeholder") |
| } |
| assert.Equal(t, 2, partition.GetTotalAllocationCount(), "allocations should be registered: ph + normal") |
| assert.Equal(t, 1, partition.getPhAllocationCount(), "placeholder allocation should be registered") |
| assert.Equal(t, allocKey2, alloc.GetAllocationKey(), "expected allocation of alloc-2 to be returned") |
| assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].Count, "placeholder data should show 1 available placeholder") |
| assert.Equal(t, int64(0), app.GetAllPlaceholderData()[0].Replaced, "placeholder data should show no replacements yet") |
| |
| // release placeholder: do what the context would do after the shim processing |
| release := &si.AllocationRelease{ |
| PartitionName: "test", |
| ApplicationID: appID1, |
| AllocationKey: phAllocationKey, |
| TerminationType: si.TerminationType_PLACEHOLDER_REPLACED, |
| } |
| released, confirmed := partition.removeAllocation(release) |
| assert.Equal(t, len(released), 0, "not expecting any released allocations") |
| if confirmed == nil { |
| t.Fatal("confirmed allocation should not be nil") |
| } |
| assert.Equal(t, 2, partition.GetTotalAllocationCount(), "two allocations should be registered") |
| assert.Equal(t, 0, partition.getPhAllocationCount(), "no placeholder allocation should be registered") |
| assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].Replaced, "placeholder data should show the replacement") |
| assertLimits(t, getTestUserGroup(), resources.Multiply(phRes, 2)) |
| |
| // add a new ask the same task group as the placeholder |
| // all placeholders are used so direct allocation is expected |
| ask = newAllocationAskTG("no_ph_used", appID1, taskGroup, phRes, false) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask no_ph_used to app") |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("expected ask to be allocated no placeholders left") |
| } |
| assertLimits(t, getTestUserGroup(), resources.Multiply(phRes, 3)) |
| assert.Equal(t, 3, partition.GetTotalAllocationCount(), "three allocations should be registered") |
| assert.Equal(t, 0, partition.getPhAllocationCount(), "no placeholder allocation should be registered") |
| } |
| |
| func TestPreemptedPlaceholderSkip(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| tgRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 10}) |
| phRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2, "second": 2}) |
| |
| // add node to allow allocation |
| setupNode(t, nodeID1, partition, tgRes) |
| |
| // add the app with placeholder request |
| app := newApplicationTG(appID1, "default", "root.default", tgRes) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app-1 should have been added to the partition") |
| // add an ask for a placeholder and allocate |
| ask := newAllocationAskTG(phID, appID1, taskGroup, phRes, true) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add placeholder ask ph-1 to app") |
| // try to allocate a placeholder via normal allocate |
| ph := partition.tryAllocate() |
| if ph == nil { |
| t.Fatal("expected placeholder ph-1 to be allocated") |
| } |
| phAllocationKey := ph.GetAllocationKey() |
| assert.Equal(t, phID, ph.GetAllocationKey(), "expected allocation of ph-1 to be returned") |
| assert.Equal(t, 1, len(app.GetAllPlaceholderData()), "placeholder data should be created on allocate") |
| assert.Equal(t, 1, partition.GetTotalAllocationCount(), "placeholder allocation should be registered as allocation") |
| assert.Equal(t, 1, partition.getPhAllocationCount(), "placeholder allocation should be registered") |
| |
| // add a new ask the same task group as the placeholder |
| ask = newAllocationAskTG(allocKey, appID1, taskGroup, phRes, false) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app") |
| alloc := partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("expected ask not to be allocated (matched task group)") |
| } |
| |
| // mark the placeholder as preempted (shortcut not interested in usage accounting etc.) |
| ph.MarkPreempted() |
| |
| // replace the placeholder should NOT work |
| alloc = partition.tryPlaceholderAllocate() |
| if alloc != nil { |
| t.Fatal("allocation should not have matched placeholder") |
| } |
| |
| // release placeholder: do what the context would do after the shim processing |
| release := &si.AllocationRelease{ |
| PartitionName: "test", |
| ApplicationID: appID1, |
| AllocationKey: phAllocationKey, |
| TerminationType: si.TerminationType_PREEMPTED_BY_SCHEDULER, |
| } |
| released, confirmed := partition.removeAllocation(release) |
| assert.Equal(t, len(released), 0, "expecting no released allocation") |
| if confirmed != nil { |
| t.Fatal("confirmed allocation should be nil") |
| } |
| assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].TimedOut, "placeholder data should show the preemption") |
| assert.Equal(t, 0, partition.GetTotalAllocationCount(), "no allocation should be registered") |
| assert.Equal(t, 0, partition.getPhAllocationCount(), "no placeholder allocation should be registered") |
| |
| // normal allocate should work as we have no placeholders left |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("expected ask to be allocated (no placeholder left)") |
| } |
| assert.Equal(t, allocKey, alloc.GetAllocationKey(), "expected allocation of alloc-1 to be returned") |
| assert.Equal(t, 1, len(app.GetAllPlaceholderData()), "placeholder data should not be updated") |
| assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].Count, "placeholder data should show 1 available placeholder") |
| assert.Equal(t, int64(0), app.GetAllPlaceholderData()[0].Replaced, "placeholder data should show no replacements") |
| assert.Equal(t, int64(1), app.GetAllPlaceholderData()[0].TimedOut, "placeholder data should show the preemption") |
| assert.Equal(t, 1, partition.GetTotalAllocationCount(), "allocation should be registered as allocation") |
| assert.Equal(t, 0, partition.getPhAllocationCount(), "placeholder allocation should be registered") |
| } |
| |
| // simple direct replace with one node |
| func TestTryPlaceholderAllocate(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| if alloc := partition.tryPlaceholderAllocate(); alloc != nil { |
| t.Fatalf("empty cluster placeholder allocate returned allocation: %s", alloc) |
| } |
| |
| var tgRes, res *resources.Resource |
| tgRes, err = resources.NewResourceFromConf(map[string]string{"vcore": "10"}) |
| assert.NilError(t, err, "failed to create resource") |
| res, err = resources.NewResourceFromConf(map[string]string{"vcore": "1"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // add node to allow allocation |
| node := setupNode(t, nodeID1, partition, tgRes) |
| |
| // add the app with placeholder request |
| app := newApplicationTG(appID1, "default", "root.default", tgRes) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app-1 should have been added to the partition") |
| // add an ask for a placeholder and allocate |
| ask := newAllocationAskTG(phID, appID1, taskGroup, res, true) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add placeholder ask ph-1 to app") |
| // try to allocate placeholder should just return |
| alloc := partition.tryPlaceholderAllocate() |
| if alloc != nil { |
| t.Fatalf("placeholder ask should not be allocated: %s", alloc) |
| } |
| assertLimits(t, getTestUserGroup(), nil) |
| // try to allocate a placeholder via normal allocate |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("expected first placeholder to be allocated") |
| } |
| assert.Equal(t, node.GetAllocation(alloc.GetAllocationKey()), alloc, "placeholder allocation not found on node") |
| assert.Assert(t, alloc.IsPlaceholder(), "placeholder alloc should return a placeholder allocation") |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "placeholder alloc should return an allocated result") |
| if !resources.Equals(app.GetPlaceholderResource(), res) { |
| t.Fatalf("placeholder allocation not updated as expected: got %s, expected %s", app.GetPlaceholderResource(), res) |
| } |
| assert.Equal(t, 1, partition.GetTotalAllocationCount(), "placeholder allocation should be counted as alloc") |
| assert.Equal(t, 1, partition.getPhAllocationCount(), "placeholder allocation should be registered") |
| assertLimits(t, getTestUserGroup(), res) |
| // add a second ph ask and run it again it should not match the already allocated placeholder |
| ask = newAllocationAskTG("ph-2", appID1, taskGroup, res, true) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add placeholder ask ph-2 to app") |
| // try to allocate placeholder should just return |
| alloc = partition.tryPlaceholderAllocate() |
| if alloc != nil { |
| t.Fatalf("placeholder ask should not be allocated: %s", alloc) |
| } |
| assertLimits(t, getTestUserGroup(), res) |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("expected 2nd placeholder to be allocated") |
| } |
| assert.Equal(t, node.GetAllocation(alloc.GetAllocationKey()), alloc, "placeholder allocation 2 not found on node") |
| if !resources.Equals(app.GetPlaceholderResource(), resources.Multiply(res, 2)) { |
| t.Fatalf("placeholder allocation not updated as expected: got %s, expected %s", app.GetPlaceholderResource(), resources.Multiply(res, 2)) |
| } |
| assert.Equal(t, 2, partition.GetTotalAllocationCount(), "placeholder allocation should be counted as alloc") |
| assert.Equal(t, 2, partition.getPhAllocationCount(), "placeholder allocation should be registered") |
| assertLimits(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| |
| // not mapping to the same taskgroup should not do anything |
| ask = newAllocationAskTG(allocKey, appID1, "tg-unk", res, false) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app") |
| alloc = partition.tryPlaceholderAllocate() |
| if alloc != nil { |
| t.Fatalf("allocation should not have matched placeholder: %s", alloc) |
| } |
| assertLimits(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| |
| // add an ask with the TG |
| ask = newAllocationAskTG(allocKey2, appID1, taskGroup, res, false) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app with correct TG") |
| alloc = partition.tryPlaceholderAllocate() |
| if alloc == nil { |
| t.Fatal("allocation should have matched placeholder") |
| } |
| assert.Equal(t, 2, partition.GetTotalAllocationCount(), "placeholder replacement should not be counted as alloc") |
| assert.Equal(t, 2, partition.getPhAllocationCount(), "placeholder allocation should be registered") |
| assert.Equal(t, alloc.GetResult(), objects.Replaced, "result is not the expected allocated replaced") |
| assert.Equal(t, alloc.GetReleaseCount(), 1, "released allocations should have been 1") |
| assertLimits(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| phAllocationKey := alloc.GetFirstRelease().GetAllocationKey() |
| // placeholder is not released until confirmed by the shim |
| if !resources.Equals(app.GetPlaceholderResource(), resources.Multiply(res, 2)) { |
| t.Fatalf("placeholder allocation not updated as expected: got %s, expected %s", app.GetPlaceholderResource(), resources.Multiply(res, 2)) |
| } |
| assert.Assert(t, resources.IsZero(app.GetAllocatedResource()), "allocated resources should still be zero") |
| // release placeholder: do what the context would do after the shim processing |
| release := &si.AllocationRelease{ |
| PartitionName: "test", |
| ApplicationID: appID1, |
| AllocationKey: phAllocationKey, |
| TerminationType: si.TerminationType_PLACEHOLDER_REPLACED, |
| } |
| released, confirmed := partition.removeAllocation(release) |
| assert.Equal(t, 2, partition.GetTotalAllocationCount(), "still should have 2 allocation after 1 placeholder release") |
| assert.Equal(t, 1, partition.getPhAllocationCount(), "placeholder allocation should be removed") |
| assert.Equal(t, len(released), 0, "not expecting any released allocations") |
| if confirmed == nil { |
| t.Fatal("confirmed allocation should not be nil") |
| } |
| assert.Equal(t, confirmed.GetAllocationKey(), alloc.GetAllocationKey(), "confirmed allocation has unexpected AllocationKey") |
| if !resources.Equals(app.GetPlaceholderResource(), res) { |
| t.Fatalf("placeholder allocations not updated as expected: got %s, expected %s", app.GetPlaceholderResource(), res) |
| } |
| if !resources.Equals(app.GetAllocatedResource(), res) { |
| t.Fatalf("allocations not updated as expected: got %s, expected %s", app.GetAllocatedResource(), res) |
| } |
| assertLimits(t, getTestUserGroup(), resources.Multiply(res, 2)) |
| } |
| |
| // The failure is triggered by the predicate plugin and is hidden in the alloc handling |
| func TestFailReplacePlaceholder(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| if alloc := partition.tryPlaceholderAllocate(); alloc != nil { |
| t.Fatalf("empty cluster placeholder allocate returned allocation: %s", alloc) |
| } |
| // plugin to let the pre-check fail on node-1 only, means we cannot replace the placeholder |
| plugin := mock.NewPredicatePlugin(false, map[string]int{nodeID1: -1}) |
| plugins.RegisterSchedulerPlugin(plugin) |
| defer func() { |
| passPlugin := mock.NewPredicatePlugin(false, nil) |
| plugins.RegisterSchedulerPlugin(passPlugin) |
| }() |
| var tgRes, res *resources.Resource |
| tgRes, err = resources.NewResourceFromConf(map[string]string{"vcore": "10"}) |
| assert.NilError(t, err, "failed to create resource") |
| res, err = resources.NewResourceFromConf(map[string]string{"vcore": "1"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // add node to allow allocation |
| node := setupNode(t, nodeID1, partition, tgRes) |
| |
| // add the app with placeholder request |
| app := newApplicationTG(appID1, "default", "root.default", tgRes) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app-1 should have been added to the partition") |
| // add an ask for a placeholder and allocate |
| ask := newAllocationAskTG(phID, appID1, taskGroup, res, true) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add placeholder ask ph-1 to app") |
| |
| // try to allocate a placeholder via normal allocate |
| alloc := partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("expected first placeholder to be allocated") |
| } |
| assert.Equal(t, partition.GetTotalAllocationCount(), 1, "placeholder allocation should be counted as alloc") |
| assert.Equal(t, partition.getPhAllocationCount(), 1, "placeholder allocation should be counted as placeholder") |
| assert.Equal(t, node.GetAllocation(alloc.GetAllocationKey()), alloc, "placeholder allocation not found on node") |
| assert.Assert(t, alloc.IsPlaceholder(), "placeholder alloc should return a placeholder allocation") |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "placeholder alloc should return an allocated result") |
| assert.Equal(t, alloc.GetNodeID(), nodeID1, "should be allocated on node-1") |
| assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), res), "placeholder allocation not updated as expected: got %s, expected %s", app.GetPlaceholderResource(), res) |
| assertLimits(t, getTestUserGroup(), res) |
| |
| // add 2nd node to allow allocation |
| node2 := setupNode(t, nodeID2, partition, tgRes) |
| assertLimits(t, getTestUserGroup(), res) |
| // add an ask with the TG |
| ask = newAllocationAskTG(allocKey, appID1, taskGroup, res, false) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app with correct TG") |
| alloc = partition.tryPlaceholderAllocate() |
| if alloc == nil { |
| t.Fatal("allocation should have matched placeholder") |
| } |
| assert.Equal(t, partition.GetTotalAllocationCount(), 1, "placeholder replacement should not be counted as alloc") |
| assert.Equal(t, partition.getPhAllocationCount(), 1, "placeholder allocation should not change") |
| assert.Equal(t, alloc.GetResult(), objects.Replaced, "result is not the expected allocated replaced") |
| assert.Equal(t, alloc.GetReleaseCount(), 1, "released allocations should have been 1") |
| // allocation must be added as it is on a different node |
| assert.Equal(t, alloc.GetNodeID(), nodeID2, "should be allocated on node-2") |
| assert.Assert(t, resources.IsZero(app.GetAllocatedResource()), "allocated resources should be zero") |
| assert.Assert(t, resources.Equals(node.GetAllocatedResource(), res), "node-1 allocation not updated as expected: got %s, expected %s", node.GetAllocatedResource(), res) |
| assert.Assert(t, resources.Equals(node2.GetAllocatedResource(), res), "node-2 allocation not updated as expected: got %s, expected %s", node2.GetAllocatedResource(), res) |
| |
| phAllocationKey := alloc.GetFirstRelease().GetAllocationKey() |
| // placeholder is not released until confirmed by the shim |
| assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), res), "placeholder allocation not updated as expected: got %s, expected %s", app.GetPlaceholderResource(), resources.Multiply(res, 2)) |
| assertLimits(t, getTestUserGroup(), res) |
| |
| // release placeholder: do what the context would do after the shim processing |
| release := &si.AllocationRelease{ |
| PartitionName: "test", |
| ApplicationID: appID1, |
| AllocationKey: phAllocationKey, |
| TerminationType: si.TerminationType_PLACEHOLDER_REPLACED, |
| } |
| released, confirmed := partition.removeAllocation(release) |
| assert.Equal(t, partition.GetTotalAllocationCount(), 1, "still should have 1 allocation after placeholder release") |
| assert.Equal(t, partition.getPhAllocationCount(), 0, "placeholder allocation should be removed") |
| assert.Equal(t, len(released), 0, "not expecting any released allocations") |
| if confirmed == nil { |
| t.Fatal("confirmed allocation should not be nil") |
| } |
| assert.Equal(t, confirmed.GetAllocationKey(), alloc.GetAllocationKey(), "confirmed allocation has unexpected AllocationKey") |
| assert.Assert(t, resources.IsZero(app.GetPlaceholderResource()), "placeholder resources should be zero") |
| assert.Assert(t, resources.Equals(app.GetAllocatedResource(), res), "allocations not updated as expected: got %s, expected %s", app.GetAllocatedResource(), res) |
| assert.Assert(t, resources.IsZero(node.GetAllocatedResource()), "node-1 allocated resources should be zero") |
| assert.Assert(t, resources.Equals(node2.GetAllocatedResource(), res), "node-2 allocations not set as expected: got %s, expected %s", node2.GetAllocatedResource(), res) |
| assert.Assert(t, !app.IsCompleting(), "application with allocation should not be in COMPLETING state") |
| assertLimits(t, getTestUserGroup(), res) |
| } |
| |
| func TestAddAllocationAsk(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| err = partition.addAllocationAsk(nil) |
| assert.NilError(t, err, "nil ask should not return an error") |
| err = partition.addAllocationAsk(&si.AllocationAsk{}) |
| if err == nil { |
| t.Fatal("empty ask should have returned application not found error") |
| } |
| |
| // add the app to add an ask to |
| app := newApplication(appID1, "default", "root.default") |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app-1 should have been added to the partition") |
| // a simple ask (no repeat should fail) |
| var res *resources.Resource |
| res, err = resources.NewResourceFromConf(map[string]string{"vcore": "10"}) |
| assert.NilError(t, err, "failed to create resource") |
| askKey := "ask-key-1" |
| ask := si.AllocationAsk{ |
| AllocationKey: askKey, |
| ApplicationID: appID1, |
| ResourceAsk: res.ToProto(), |
| } |
| err = partition.addAllocationAsk(&ask) |
| assert.NilError(t, err, "failed to add ask to app") |
| if !resources.Equals(app.GetPendingResource(), res) { |
| t.Fatal("app not updated by adding ask, no error thrown") |
| } |
| assertLimits(t, getTestUserGroup(), nil) |
| } |
| |
| func TestRemoveAllocationAsk(t *testing.T) { |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| // add the app |
| app := newApplication(appID1, "default", "root.default") |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app-1 should have been added to the partition") |
| var res *resources.Resource |
| res, err = resources.NewResourceFromConf(map[string]string{"vcore": "10"}) |
| assert.NilError(t, err, "failed to create resource") |
| askKey := "ask-key-1" |
| ask := newAllocationAsk(askKey, appID1, res) |
| err = app.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to ask to application") |
| |
| // we should not panic on nil |
| partition.removeAllocationAsk(nil) |
| // we don't care about the partition name as we test just the partition code |
| release := &si.AllocationAskRelease{ |
| ApplicationID: "fake-app", |
| AllocationKey: askKey, |
| TerminationType: si.TerminationType_STOPPED_BY_RM, |
| } |
| // unknown app should do nothing |
| partition.removeAllocationAsk(release) |
| if !resources.Equals(app.GetPendingResource(), res) { |
| t.Fatal("wrong app updated removing ask") |
| } |
| |
| // known app, unknown ask no change |
| release.ApplicationID = appID1 |
| release.AllocationKey = "fake" |
| partition.removeAllocationAsk(release) |
| if !resources.Equals(app.GetPendingResource(), res) { |
| t.Fatal("app updated removing unknown ask") |
| } |
| |
| // known app, known ask, ignore timeout as it originates in the core |
| release.AllocationKey = askKey |
| release.TerminationType = si.TerminationType_TIMEOUT |
| partition.removeAllocationAsk(release) |
| if !resources.Equals(app.GetPendingResource(), res) { |
| t.Fatal("app updated removing timed out ask, should not have changed") |
| } |
| |
| // correct remove of a known ask |
| release.TerminationType = si.TerminationType_STOPPED_BY_RM |
| partition.removeAllocationAsk(release) |
| assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app should not have pending asks") |
| assertLimits(t, getTestUserGroup(), nil) |
| } |
| |
| func TestUpdatePreemption(t *testing.T) { |
| var True = true |
| var False = false |
| |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "Partition creation failed") |
| assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should be enabled by default") |
| |
| partition.updatePreemption(configs.PartitionConfig{}) |
| assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should be enabled by empty config") |
| |
| partition.updatePreemption(configs.PartitionConfig{Preemption: configs.PartitionPreemptionConfig{}}) |
| assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should be enabled by empty preemption section") |
| |
| partition.updatePreemption(configs.PartitionConfig{Preemption: configs.PartitionPreemptionConfig{Enabled: nil}}) |
| assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should be enabled by explicit nil") |
| |
| partition.updatePreemption(configs.PartitionConfig{Preemption: configs.PartitionPreemptionConfig{Enabled: &True}}) |
| assert.Assert(t, partition.isPreemptionEnabled(), "preeemption should be enabled by explicit true") |
| |
| partition.updatePreemption(configs.PartitionConfig{Preemption: configs.PartitionPreemptionConfig{Enabled: &False}}) |
| assert.Assert(t, !partition.isPreemptionEnabled(), "preeemption should be disabled by explicit false") |
| } |
| |
| func TestUpdateNodeSortingPolicy(t *testing.T) { |
| partition, err := newBasePartition() |
| if err != nil { |
| t.Errorf("Partition creation failed: %s", err.Error()) |
| } |
| |
| if partition.nodes.GetNodeSortingPolicy().PolicyType().String() != policies.FairnessPolicy.String() { |
| t.Error("Node policy is not set with the default policy which is fair policy.") |
| } |
| |
| var tests = []struct { |
| name string |
| input string |
| want string |
| }{ |
| {"Set binpacking policy", policies.BinPackingPolicy.String(), policies.BinPackingPolicy.String()}, |
| {"Set fair policy", policies.FairnessPolicy.String(), policies.FairnessPolicy.String()}, |
| } |
| |
| for _, tt := range tests { |
| t.Run(tt.name, func(t *testing.T) { |
| partition.updateNodeSortingPolicy(configs.PartitionConfig{ |
| Name: "test", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "root", |
| Parent: true, |
| SubmitACL: "*", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "default", |
| Parent: false, |
| Queues: nil, |
| }, |
| }, |
| }, |
| }, |
| PlacementRules: nil, |
| Limits: nil, |
| NodeSortPolicy: configs.NodeSortingPolicy{Type: tt.input}, |
| }) |
| |
| ans := partition.nodes.GetNodeSortingPolicy().PolicyType().String() |
| if ans != tt.want { |
| t.Errorf("got %s, want %s", ans, tt.want) |
| } |
| }) |
| } |
| } |
| |
| // A Test Case of get function in object/node_cellection |
| func TestGetNodeSortingPolicyWhenNewPartitionFromConfig(t *testing.T) { |
| var tests = []struct { |
| name string |
| input string |
| want string |
| }{ |
| {"Default policy", "", policies.FairnessPolicy.String()}, |
| {"Fair policy", policies.FairnessPolicy.String(), policies.FairnessPolicy.String()}, |
| {"Binpacking policy", policies.BinPackingPolicy.String(), policies.BinPackingPolicy.String()}, |
| } |
| |
| for _, tt := range tests { |
| t.Run(tt.name, func(t *testing.T) { |
| conf := configs.PartitionConfig{ |
| Name: "test", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "root", |
| Parent: true, |
| SubmitACL: "*", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "default", |
| Parent: false, |
| Queues: nil, |
| }, |
| }, |
| }, |
| }, |
| PlacementRules: nil, |
| Limits: nil, |
| NodeSortPolicy: configs.NodeSortingPolicy{ |
| Type: tt.input, |
| }, |
| } |
| |
| p, err := newPartitionContext(conf, rmID, nil) |
| if err != nil { |
| t.Errorf("Partition creation fail: %s", err.Error()) |
| } |
| |
| ans := p.nodes.GetNodeSortingPolicy().PolicyType().String() |
| if ans != tt.want { |
| t.Errorf("got %s, want %s", ans, tt.want) |
| } |
| }) |
| } |
| } |
| |
| func TestTryAllocateMaxRunning(t *testing.T) { |
| const resType = "vcore" |
| partition := createQueuesNodes(t) |
| if partition == nil { |
| t.Fatal("partition create failed") |
| } |
| if alloc := partition.tryAllocate(); alloc != nil { |
| t.Fatalf("empty cluster allocate returned allocation: %v", alloc.String()) |
| } |
| |
| // set max running apps |
| root := partition.getQueueInternal("root") |
| root.SetMaxRunningApps(2) |
| parent := partition.getQueueInternal("root.parent") |
| parent.SetMaxRunningApps(1) |
| |
| // add first app to the partition |
| appRes, err := resources.NewResourceFromConf(map[string]string{resType: "2"}) |
| assert.NilError(t, err, "app resource creation failed") |
| app := newApplicationTG(appID1, "default", "root.parent.sub-leaf", appRes) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| var res *resources.Resource |
| res, err = resources.NewResourceFromConf(map[string]string{resType: "1"}) |
| assert.NilError(t, err, "failed to create resource") |
| err = app.AddAllocationAsk(newAllocationAskTG(allocKey, appID1, "ph1", res, true)) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-1") |
| // first allocation should move the app to accepted |
| assert.Equal(t, app.CurrentState(), objects.Accepted.String(), "application should have moved to accepted state") |
| |
| // allocate should work: app stays in accepted state (placeholder!) |
| alloc := partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations should have been 0") |
| assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application app-1 to be allocated") |
| assert.Equal(t, alloc.GetAllocationKey(), allocKey, "expected ask alloc to be allocated") |
| assert.Equal(t, app.CurrentState(), objects.Accepted.String(), "application should have moved to accepted state") |
| |
| // add second app to the partition |
| app2 := newApplication(appID2, "default", "root.parent.sub-leaf") |
| err = partition.AddApplication(app2) |
| assert.NilError(t, err, "failed to add app-2 to partition") |
| err = app2.AddAllocationAsk(newAllocationAsk(allocKey, appID2, res)) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-2") |
| |
| // allocation should fail max running app is reached on parent via accepted allocating |
| alloc = partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("allocation should not have returned as parent limit is reached") |
| } |
| |
| // allocate should work: app moves to Starting all placeholder allocated |
| err = app.AddAllocationAsk(newAllocationAskTG("alloc-2", appID1, "ph1", res, true)) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app-1") |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations should have been 0") |
| assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application app-1 to be allocated") |
| assert.Equal(t, alloc.GetAllocationKey(), "alloc-2", "expected ask alloc-2 to be allocated") |
| assert.Equal(t, app.CurrentState(), objects.Running.String(), "application should have moved to running state") |
| |
| // allocation should still fail: max running apps on parent reached |
| alloc = partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("allocation should not have returned as parent limit is reached") |
| } |
| |
| // update the parent queue max running |
| parent.SetMaxRunningApps(2) |
| // allocation works |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| assert.Equal(t, alloc.GetReleaseCount(), 0, "released allocations should have been 0") |
| assert.Equal(t, alloc.GetApplicationID(), appID2, "expected application app-2 to be allocated") |
| assert.Equal(t, alloc.GetAllocationKey(), allocKey, "expected ask alloc-1 to be allocated") |
| } |
| |
| func TestNewQueueEvents(t *testing.T) { |
| events.Init() |
| eventSystem := events.GetEventSystem().(*events.EventSystemImpl) //nolint:errcheck |
| eventSystem.StartServiceWithPublisher(false) |
| |
| partition, err := newBasePartition() |
| assert.NilError(t, err) |
| _, err = partition.createQueue("root.test", security.UserGroup{ |
| User: "test", |
| }) |
| assert.NilError(t, err) |
| noEvents := uint64(0) |
| err = common.WaitFor(10*time.Millisecond, time.Second, func() bool { |
| noEvents = eventSystem.Store.CountStoredEvents() |
| return noEvents == 3 |
| }) |
| assert.NilError(t, err, "expected 3 events, got %d", noEvents) |
| records := eventSystem.Store.CollectEvents() |
| assert.Equal(t, si.EventRecord_QUEUE, records[0].Type) |
| assert.Equal(t, si.EventRecord_ADD, records[0].EventChangeType) |
| assert.Equal(t, si.EventRecord_DETAILS_NONE, records[0].EventChangeDetail) |
| assert.Equal(t, "root", records[0].ObjectID) |
| assert.Equal(t, si.EventRecord_QUEUE, records[1].Type) |
| assert.Equal(t, si.EventRecord_ADD, records[1].EventChangeType) |
| assert.Equal(t, si.EventRecord_DETAILS_NONE, records[1].EventChangeDetail) |
| assert.Equal(t, "root.default", records[1].ObjectID) |
| assert.Equal(t, si.EventRecord_QUEUE, records[2].Type) |
| assert.Equal(t, si.EventRecord_ADD, records[2].EventChangeType) |
| assert.Equal(t, si.EventRecord_QUEUE_DYNAMIC, records[2].EventChangeDetail) |
| assert.Equal(t, "root.test", records[2].ObjectID) |
| } |
| |
| func TestUserHeadroom(t *testing.T) { |
| setupUGM() |
| partition, err := newConfiguredPartition() |
| assert.NilError(t, err, "test partition create failed with error") |
| var res *resources.Resource |
| res, err = resources.NewResourceFromConf(map[string]string{"memory": "10", "vcores": "10"}) |
| assert.NilError(t, err, "failed to create basic resource") |
| err = partition.AddNode(newNodeMaxResource("node-1", res), nil) |
| assert.NilError(t, err, "test node1 add failed unexpected") |
| err = partition.AddNode(newNodeMaxResource("node-2", res), nil) |
| assert.NilError(t, err, "test node2 add failed unexpected") |
| |
| app1 := newApplication(appID1, "default", "root.parent.sub-leaf") |
| res, err = resources.NewResourceFromConf(map[string]string{"memory": "3", "vcores": "3"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| err = partition.AddApplication(app1) |
| assert.NilError(t, err, "failed to add app-1 to partition") |
| err = app1.AddAllocationAsk(newAllocationAsk(allocKey, appID1, res)) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-1") |
| |
| app2 := newApplication(appID2, "default", "root.parent.sub-leaf") |
| err = partition.AddApplication(app2) |
| assert.NilError(t, err, "failed to add app-2 to partition") |
| err = app2.AddAllocationAsk(newAllocationAsk(allocKey, appID2, res)) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-2") |
| |
| // app 1 would be allocated as there is headroom available for the user |
| alloc := partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| |
| // app 2 allocation won't happen as there is no headroom for the user |
| alloc = partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("allocation should not happen") |
| } |
| |
| res1, err := resources.NewResourceFromConf(map[string]string{"memory": "5", "vcores": "5"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| app3 := newApplication(appID3, "default", "root.leaf") |
| err = partition.AddApplication(app3) |
| assert.NilError(t, err, "failed to add app-3 to partition") |
| err = app3.AddAllocationAsk(newAllocationAsk(allocKey, appID3, res1)) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-3") |
| |
| // app 3 would be allocated as there is headroom available for the user |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| |
| app4 := newApplication("app-4", "default", "root.leaf") |
| err = partition.AddApplication(app4) |
| assert.NilError(t, err, "failed to add app-4 to partition") |
| err = app4.AddAllocationAsk(newAllocationAsk(allocKey, "app-4", res1)) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-4") |
| |
| // app 4 allocation won't happen as there is no headroom for the user |
| alloc = partition.tryAllocate() |
| if alloc != nil { |
| t.Fatal("allocation should not happen") |
| } |
| partition.removeApplication(appID1) |
| partition.removeApplication(appID2) |
| partition.removeApplication(appID3) |
| partition.removeApplication("app-4") |
| |
| // create a reservation and ensure reservation has been allocated because there is enough headroom for the user to run the app |
| app5 := newApplication("app-5", "default", "root.parent.sub-leaf") |
| err = partition.AddApplication(app5) |
| assert.NilError(t, err, "failed to add app-5 to partition") |
| |
| res, err = resources.NewResourceFromConf(map[string]string{"memory": "3", "vcores": "3"}) |
| assert.NilError(t, err, "failed to create resource") |
| ask := newAllocationAsk("alloc-1", "app-5", res) |
| err = app5.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask to app") |
| |
| node2 := partition.GetNode(nodeID2) |
| if node2 == nil { |
| t.Fatal("expected node-2 to be returned got nil") |
| } |
| partition.reserve(app5, node2, ask) |
| |
| // turn off the second node |
| node1 := partition.GetNode(nodeID1) |
| node1.SetSchedulable(false) |
| |
| alloc = partition.tryReservedAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, objects.AllocatedReserved, alloc.GetResult(), "allocation result should have been allocated") |
| |
| // create a reservation and ensure reservation has not been allocated because there is no headroom for the user |
| ask = newAllocationAsk("alloc-2", "app-5", res) |
| err = app5.AddAllocationAsk(ask) |
| assert.NilError(t, err, "failed to add ask to app") |
| partition.reserve(app5, node2, ask) |
| alloc = partition.tryReservedAllocate() |
| if alloc != nil { |
| t.Fatal("allocation should not happen on other nodes as well") |
| } |
| partition.removeApplication("app-5") |
| |
| app6 := newApplicationWithUser("app-6", "default", "root.parent.sub-leaf", security.UserGroup{ |
| User: "testuser1", |
| Groups: []string{"testgroup1"}, |
| }) |
| res, err = resources.NewResourceFromConf(map[string]string{"memory": "3", "vcores": "3"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| err = partition.AddApplication(app6) |
| assert.NilError(t, err, "failed to add app-6 to partition") |
| err = app6.AddAllocationAsk(newAllocationAsk(allocKey, "app-6", res)) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-6") |
| |
| // app 6 would be allocated as headroom is nil because no limits configured for 'testuser1' user an |
| alloc = partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| } |
| |
| func TestPlaceholderAllocationTracking(t *testing.T) { |
| const phID3 = "ph-3" |
| setupUGM() |
| partition := createQueuesNodes(t) |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "1"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // add the app with placeholder request |
| app := newApplication(appID1, "default", "root.parent.sub-leaf") |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "app-1 should have been added to the partition") |
| // add three asks for a placeholder and allocate |
| ask1 := newAllocationAskTG(phID, appID1, taskGroup, res, true) |
| err = app.AddAllocationAsk(ask1) |
| assert.NilError(t, err, "failed to add placeholder ask to app") |
| ask2 := newAllocationAskTG(phID2, appID1, taskGroup, res, true) |
| err = app.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "could not add ask") |
| ask3 := newAllocationAskTG(phID3, appID1, taskGroup, res, true) |
| err = app.AddAllocationAsk(ask3) |
| assert.NilError(t, err, "could not add ask") |
| assert.Equal(t, 0, partition.getPhAllocationCount()) |
| // add & allocate real asks |
| ask4 := newAllocationAskTG(allocKey, appID1, taskGroup, res, false) |
| err = app.AddAllocationAsk(ask4) |
| assert.NilError(t, err, "failed to add ask to app") |
| |
| // allocate first placeholder |
| alloc := partition.tryAllocate() |
| ph1AllocationKey := alloc.GetAllocationKey() |
| assert.Assert(t, alloc != nil, "placeholder ask should have been allocated") |
| assert.Equal(t, 1, partition.GetTotalAllocationCount(), "placeholder not counted as alloc") |
| assert.Equal(t, 1, partition.getPhAllocationCount(), "placeholder not counted as placeholder") |
| alloc1 := partition.tryPlaceholderAllocate() |
| assert.Assert(t, alloc1 != nil, "ask should have been allocated") |
| assert.Equal(t, 1, partition.GetTotalAllocationCount(), "should see no change in alloc count") |
| assert.Equal(t, 1, partition.getPhAllocationCount(), "should see no change in placeholder count") |
| |
| // allocate second placeholder |
| alloc = partition.tryAllocate() |
| ph2AllocationKey := alloc.GetAllocationKey() |
| assert.Assert(t, alloc != nil, "placeholder ask should have been allocated") |
| assert.Equal(t, 2, partition.GetTotalAllocationCount(), "placeholder not counted as alloc") |
| assert.Equal(t, 2, partition.getPhAllocationCount(), "placeholder not counted as placeholder") |
| // allocate third placeholder |
| alloc = partition.tryAllocate() |
| ph3AllocationKey := alloc.GetAllocationKey() |
| assert.Assert(t, alloc != nil, "placeholder ask should have been allocated") |
| assert.Equal(t, 3, partition.GetTotalAllocationCount(), "placeholder not counted as alloc") |
| assert.Equal(t, 3, partition.getPhAllocationCount(), "placeholder not counted as placeholder") |
| |
| partition.removeAllocation(&si.AllocationRelease{ |
| AllocationKey: ph1AllocationKey, |
| ApplicationID: appID1, |
| TerminationType: si.TerminationType_PLACEHOLDER_REPLACED, |
| }) |
| assert.Equal(t, 3, partition.GetTotalAllocationCount(), "placeholder not counted as alloc") |
| assert.Equal(t, 2, partition.getPhAllocationCount(), "placeholder should be removed from count") |
| partition.removeAllocation(&si.AllocationRelease{ |
| AllocationKey: ph2AllocationKey, |
| ApplicationID: appID1, |
| TerminationType: si.TerminationType_STOPPED_BY_RM, |
| }) |
| assert.Equal(t, 2, partition.GetTotalAllocationCount(), "placeholder should be removed from alloc count") |
| assert.Equal(t, 1, partition.getPhAllocationCount(), "placeholder should be removed from count") |
| partition.removeAllocation(&si.AllocationRelease{ |
| AllocationKey: ph3AllocationKey, |
| ApplicationID: appID1, |
| TerminationType: si.TerminationType_TIMEOUT, |
| }) |
| assert.Equal(t, 1, partition.GetTotalAllocationCount(), "placeholder should be removed from alloc count") |
| assert.Equal(t, 0, partition.getPhAllocationCount(), "placeholder should be removed from count") |
| } |
| |
| func TestReservationTracking(t *testing.T) { |
| setupUGM() |
| partition := createQueuesNodes(t) |
| |
| app := newApplication(appID1, "default", "root.parent.sub-leaf") |
| res, err := resources.NewResourceFromConf(map[string]string{"vcore": "10"}) |
| assert.NilError(t, err, "failed to create resource") |
| |
| // add to the partition |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "failed to add app to partition") |
| ask1 := newAllocationAsk(allocKey, appID1, res) |
| ask1.SetRequiredNode(nodeID1) |
| err = app.AddAllocationAsk(ask1) |
| assert.NilError(t, err, "failed to add ask") |
| ask2 := newAllocationAsk(allocKey2, appID1, res) |
| ask2.SetRequiredNode(nodeID1) |
| err = app.AddAllocationAsk(ask2) |
| assert.NilError(t, err, "failed to add ask") |
| |
| alloc := partition.tryAllocate() // ask1 occupies node1 |
| allocationKey := alloc.GetAllocationKey() |
| assert.Equal(t, objects.Allocated, alloc.GetResult()) |
| assert.Equal(t, "alloc-1", alloc.GetAllocationKey()) |
| assert.Equal(t, 0, partition.getReservationCount()) |
| alloc = partition.tryAllocate() // ask2 gets reserved |
| assert.Assert(t, alloc == nil) |
| assert.Equal(t, 1, partition.getReservationCount()) |
| |
| partition.removeAllocation(&si.AllocationRelease{ |
| AllocationKey: allocationKey, |
| ApplicationID: appID1, |
| TerminationType: si.TerminationType_STOPPED_BY_RM, |
| }) // terminate ask1 |
| alloc = partition.tryReservedAllocate() // allocate reservation |
| assert.Equal(t, 0, partition.getReservationCount()) |
| assert.Equal(t, "alloc-2", alloc.GetAllocationKey()) |
| } |
| |
| //nolint:funlen |
| func TestLimitMaxApplications(t *testing.T) { |
| testCases := []struct { |
| name string |
| limits []configs.Limit |
| }{ |
| { |
| name: "specific user", |
| limits: []configs.Limit{ |
| { |
| Limit: "specific user limit", |
| Users: []string{"testuser"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 1, |
| }, |
| }, |
| }, |
| { |
| name: "specific group", |
| limits: []configs.Limit{ |
| { |
| Limit: "specific group limit", |
| Groups: []string{"testgroup"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 1, |
| }, |
| }, |
| }, |
| { |
| name: "wildcard user", |
| limits: []configs.Limit{ |
| { |
| Limit: "wildcard user limit", |
| Users: []string{"*"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 1, |
| }, |
| }, |
| }, |
| { |
| name: "wildcard group", |
| limits: []configs.Limit{ |
| { |
| Limit: "specific group limit", |
| Groups: []string{"nonexistent-group"}, |
| MaxResources: map[string]string{"memory": "500", "vcores": "500"}, |
| MaxApplications: 100, |
| }, |
| { |
| Limit: "wildcard group limit", |
| Groups: []string{"*"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 1, |
| }, |
| }, |
| }, |
| { |
| name: "specific user lower than specific group limit", |
| limits: []configs.Limit{ |
| { |
| Limit: "specific user limit", |
| Users: []string{"testuser"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 1, |
| }, |
| { |
| Limit: "specific user limit", |
| Groups: []string{"testgroup"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 100, |
| }, |
| }, |
| }, |
| { |
| name: "specific group lower than specific user limit", |
| limits: []configs.Limit{ |
| { |
| Limit: "specific user limit", |
| Users: []string{"testuser"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 100, |
| }, |
| { |
| Limit: "specific group limit", |
| Groups: []string{"testgroup"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 1, |
| }, |
| }, |
| }, |
| } |
| for _, tc := range testCases { |
| t.Run(tc.name, func(t *testing.T) { |
| setupUGM() |
| conf := configs.PartitionConfig{ |
| Name: "default", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "root", |
| Parent: true, |
| SubmitACL: "*", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "default", |
| Parent: false, |
| Limits: tc.limits, |
| }, |
| }, |
| }, |
| }, |
| NodeSortPolicy: configs.NodeSortingPolicy{}, |
| } |
| |
| partition, err := newPartitionContext(conf, rmID, nil) |
| assert.NilError(t, err, "partition create failed") |
| |
| // add node1 |
| nodeRes, err := resources.NewResourceFromConf(map[string]string{"memory": "10", "vcores": "10"}) |
| assert.NilError(t, err, "failed to create basic resource") |
| err = partition.AddNode(newNodeMaxResource("node-1", nodeRes), nil) |
| assert.NilError(t, err, "test node1 add failed unexpected") |
| |
| resMap := map[string]string{"memory": "2", "vcores": "2"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "Unexpected error when creating resource from map") |
| |
| // add app1 |
| app1 := newApplication(appID1, "default", defQueue) |
| err = partition.AddApplication(app1) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| err = app1.AddAllocationAsk(newAllocationAsk(allocKey, appID1, res)) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-1") |
| |
| alloc := partition.tryAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") |
| assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application app-1 to be allocated") |
| assert.Equal(t, alloc.GetAllocationKey(), allocKey, "expected ask alloc-1 to be allocated") |
| |
| // add app2 |
| app2 := newApplication(appID2, "default", defQueue) |
| err = partition.AddApplication(app2) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| err = app2.AddAllocationAsk(newAllocationAsk(allocKey2, appID2, res)) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app-1") |
| assert.Equal(t, app2.CurrentState(), objects.Accepted.String(), "application should have moved to accepted state") |
| |
| alloc = partition.tryAllocate() |
| assert.Equal(t, alloc == nil, true, "allocation should not have happened as max apps reached") |
| }) |
| } |
| } |
| |
| //nolint:funlen |
| func TestLimitMaxApplicationsForReservedAllocation(t *testing.T) { |
| testCases := []struct { |
| name string |
| limits []configs.Limit |
| }{ |
| { |
| name: "specific user", |
| limits: []configs.Limit{ |
| { |
| Limit: "specific user limit", |
| Users: []string{"testuser"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 1, |
| }, |
| }, |
| }, |
| { |
| name: "specific group", |
| limits: []configs.Limit{ |
| { |
| Limit: "specific group limit", |
| Groups: []string{"testgroup"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 1, |
| }, |
| }, |
| }, |
| { |
| name: "wildcard user", |
| limits: []configs.Limit{ |
| { |
| Limit: "wildcard user limit", |
| Users: []string{"*"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 1, |
| }, |
| }, |
| }, |
| { |
| name: "wildcard group", |
| limits: []configs.Limit{ |
| { |
| Limit: "specific group limit", |
| Groups: []string{"nonexistent-group"}, |
| MaxResources: map[string]string{"memory": "500", "vcores": "500"}, |
| MaxApplications: 100, |
| }, |
| { |
| Limit: "wildcard group limit", |
| Groups: []string{"*"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 1, |
| }, |
| }, |
| }, |
| { |
| name: "specific user lower than specific group limit", |
| limits: []configs.Limit{ |
| { |
| Limit: "specific user limit", |
| Users: []string{"testuser"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 1, |
| }, |
| { |
| Limit: "specific user limit", |
| Groups: []string{"testgroup"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 100, |
| }, |
| }, |
| }, |
| { |
| name: "specific group lower than specific user limit", |
| limits: []configs.Limit{ |
| { |
| Limit: "specific user limit", |
| Users: []string{"testuser"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 100, |
| }, |
| { |
| Limit: "specific group limit", |
| Groups: []string{"testgroup"}, |
| MaxResources: map[string]string{"memory": "5", "vcores": "5"}, |
| MaxApplications: 1, |
| }, |
| }, |
| }, |
| } |
| for _, tc := range testCases { |
| t.Run(tc.name, func(t *testing.T) { |
| setupUGM() |
| conf := configs.PartitionConfig{ |
| Name: "default", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "root", |
| Parent: true, |
| SubmitACL: "*", |
| Queues: []configs.QueueConfig{ |
| { |
| Name: "default", |
| Parent: false, |
| Limits: tc.limits, |
| }, |
| }, |
| }, |
| }, |
| NodeSortPolicy: configs.NodeSortingPolicy{}, |
| } |
| |
| partition, err := newPartitionContext(conf, rmID, nil) |
| assert.NilError(t, err, "partition create failed") |
| |
| // add node1 |
| nodeRes, err := resources.NewResourceFromConf(map[string]string{"memory": "10", "vcores": "10"}) |
| assert.NilError(t, err, "failed to create basic resource") |
| node := newNodeMaxResource("node-1", nodeRes) |
| err = partition.AddNode(node, nil) |
| assert.NilError(t, err, "test node1 add failed unexpected") |
| |
| resMap := map[string]string{"memory": "2", "vcores": "2"} |
| res, err := resources.NewResourceFromConf(resMap) |
| assert.NilError(t, err, "Unexpected error when creating resource from map") |
| |
| // add app1 |
| app1 := newApplication(appID1, "default", defQueue) |
| err = partition.AddApplication(app1) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| app1AllocAsk := newAllocationAsk(allocKey, appID1, res) |
| err = app1.AddAllocationAsk(app1AllocAsk) |
| assert.NilError(t, err, "failed to add ask alloc-1 to app-1") |
| |
| partition.reserve(app1, node, app1AllocAsk) |
| alloc := partition.tryReservedAllocate() |
| if alloc == nil { |
| t.Fatal("allocation did not return any allocation") |
| } |
| assert.Equal(t, alloc.GetResult(), objects.AllocatedReserved, "result is not the expected allocated reserved") |
| assert.Equal(t, alloc.GetApplicationID(), appID1, "expected application app-1 to be allocated reserved") |
| assert.Equal(t, alloc.GetAllocationKey(), allocKey, "expected ask alloc-1 to be allocated reserved") |
| |
| // add app2 |
| app2 := newApplication(appID2, "default", defQueue) |
| err = partition.AddApplication(app2) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| app2AllocAsk := newAllocationAsk(allocKey2, appID2, res) |
| err = app2.AddAllocationAsk(app2AllocAsk) |
| assert.NilError(t, err, "failed to add ask alloc-2 to app-1") |
| assert.Equal(t, app2.CurrentState(), objects.Accepted.String(), "application should have moved to accepted state") |
| |
| partition.reserve(app2, node, app2AllocAsk) |
| alloc = partition.tryReservedAllocate() |
| assert.Equal(t, alloc == nil, true, "allocation should not have happened as max apps reached") |
| }) |
| } |
| } |
| |
| func TestCalculateOutstandingRequests(t *testing.T) { |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "unable to create partition: %v", err) |
| |
| // no application&asks |
| requests := partition.calculateOutstandingRequests() |
| assert.Equal(t, 0, len(requests)) |
| |
| // two applications with no asks |
| app1 := newApplication(appID1, "test", "root.default") |
| app2 := newApplication(appID2, "test", "root.default") |
| err = partition.AddApplication(app1) |
| assert.NilError(t, err) |
| err = partition.AddApplication(app2) |
| assert.NilError(t, err) |
| requests = partition.calculateOutstandingRequests() |
| assert.Equal(t, 0, len(requests)) |
| |
| // new asks for the two apps, but the scheduler hasn't processed them |
| askResource := resources.NewResourceFromMap(map[string]resources.Quantity{ |
| "vcores": 1, |
| "memory": 1, |
| }) |
| siAsk1 := &si.AllocationAsk{ |
| AllocationKey: "ask-uuid-1", |
| ApplicationID: appID1, |
| ResourceAsk: askResource.ToProto(), |
| } |
| siAsk2 := &si.AllocationAsk{ |
| AllocationKey: "ask-uuid-2", |
| ApplicationID: appID1, |
| ResourceAsk: askResource.ToProto(), |
| } |
| siAsk3 := &si.AllocationAsk{ |
| AllocationKey: "ask-uuid-3", |
| ApplicationID: appID2, |
| ResourceAsk: askResource.ToProto(), |
| } |
| err = partition.addAllocationAsk(siAsk1) |
| assert.NilError(t, err) |
| err = partition.addAllocationAsk(siAsk2) |
| assert.NilError(t, err) |
| err = partition.addAllocationAsk(siAsk3) |
| assert.NilError(t, err) |
| requests = partition.calculateOutstandingRequests() |
| assert.Equal(t, 0, len(requests)) |
| |
| // mark asks as attempted |
| app1.GetAllocationAsk("ask-uuid-1").SetSchedulingAttempted(true) |
| app1.GetAllocationAsk("ask-uuid-2").SetSchedulingAttempted(true) |
| app2.GetAllocationAsk("ask-uuid-3").SetSchedulingAttempted(true) |
| requests = partition.calculateOutstandingRequests() |
| total := resources.NewResource() |
| expectedTotal := resources.NewResourceFromMap(map[string]resources.Quantity{ |
| "memory": 3, |
| "vcores": 3, |
| }) |
| for _, req := range requests { |
| total.AddTo(req.GetAllocatedResource()) |
| } |
| assert.Equal(t, 3, len(requests)) |
| assert.Assert(t, resources.Equals(expectedTotal, total), "total resource expected: %v, got: %v", expectedTotal, total) |
| } |
| |
| func TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) { |
| // verify the following (YUNIKORN-2562): |
| // 1. Have a recovered, existing PH allocation (ph-1) from a node with task group "tg-1" |
| // 2. Have a new PH ask (ph-2) with task group "tg-2" |
| // 3. Have a real ask with task group "tg-1" |
| // 4. EXPECTED: successful allocation for the pending ask (ph-2) |
| // 5. EXPECTED: successful placeholder allocation (replacement) |
| // 6. EXPECTED: successful removal of ph-1 allocation |
| setupUGM() |
| partition, err := newBasePartition() |
| assert.NilError(t, err, "partition create failed") |
| |
| // add a new app |
| app := newApplication(appID1, "default", defQueue) |
| err = partition.AddApplication(app) |
| assert.NilError(t, err, "add application to partition should not have failed") |
| |
| // add a node with allocation |
| nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) |
| node1 := newNodeMaxResource(nodeID1, nodeRes) |
| appRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) |
| phAsk := newAllocationAskTG("placeholder", appID1, taskGroup, appRes, true) |
| ph := objects.NewAllocation(nodeID1, phAsk) |
| allocs := []*objects.Allocation{ph} |
| err = partition.AddNode(node1, allocs) |
| assert.NilError(t, err) |
| |
| // add a placeholder ask with a different taskgroup |
| phAsk2 := newAllocationAskTG("placeholder2", appID1, "tg-2", appRes, true) |
| err = app.AddAllocationAsk(phAsk2) |
| assert.NilError(t, err, "failed to add placeholder ask") |
| |
| realAsk := newAllocationAskTG("real-alloc", appID1, taskGroup, appRes, false) |
| err = app.AddAllocationAsk(realAsk) |
| assert.NilError(t, err, "failed to add real ask") |
| |
| // get an allocation for "placeholder2" |
| alloc := partition.tryAllocate() |
| assert.Assert(t, alloc != nil, "no allocation occurred") |
| assert.Equal(t, objects.Allocated, alloc.GetResult()) |
| assert.Equal(t, "placeholder2", alloc.GetAllocationKey()) |
| assert.Equal(t, "tg-2", alloc.GetTaskGroup()) |
| assert.Equal(t, "node-1", alloc.GetNodeID()) |
| |
| // real allocation gets replaced |
| alloc = partition.tryPlaceholderAllocate() |
| assert.Assert(t, alloc != nil, "no placeholder replacement occurred") |
| assert.Equal(t, objects.Replaced, alloc.GetResult()) |
| assert.Equal(t, "real-alloc", alloc.GetAllocationKey()) |
| assert.Equal(t, "tg-1", alloc.GetTaskGroup()) |
| |
| // remove the terminated placeholder allocation |
| released, confirmed := partition.removeAllocation(&si.AllocationRelease{ |
| ApplicationID: appID1, |
| TerminationType: si.TerminationType_PLACEHOLDER_REPLACED, |
| AllocationKey: "placeholder", |
| }) |
| assert.Assert(t, released == nil, "unexpected released allocation") |
| assert.Assert(t, confirmed != nil, "expected to have a confirmed allocation") |
| assert.Equal(t, "real-alloc", confirmed.GetAllocationKey()) |
| assert.Equal(t, "tg-1", confirmed.GetTaskGroup()) |
| } |