blob: dcf6d8c110738b29a488be546afca19312e3e401 [file] [log] [blame]
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package tests
import (
siCommon ""
const configDataSmokeTest = `
- name: default
- name: root
submitacl: "*"
- name: singleleaf
memory: 150M
vcore: 20
// Test scheduler reconfiguration
func TestConfigScheduler(t *testing.T) {
configData := `
- name: default
- name: root
submitacl: "*"
- name: base
guaranteed: {memory: 100M, vcore: 10}
max: {memory: 150M, vcore: 20}
- name: tobedeleted
// Start all tests
ms := &mockScheduler{}
defer ms.Stop()
err := ms.Init(configData, false, false)
assert.NilError(t, err, "RegisterResourceManager failed")
// memorize the checksum of current configs
configChecksum := configs.ConfigContext.Get("policygroup").Checksum
// Check queues of cache and scheduler.
part := ms.scheduler.GetClusterContext().GetPartition(partition)
assert.Assert(t, part.GetTotalPartitionResource() == nil, "partition info max resource not nil, first load")
// Check the queue root
rootQ := part.GetQueue("root")
assert.Assert(t, rootQ.GetMaxResource() == nil, "root queue max resource not nil, first load")
// Check the queue root.base
queue := part.GetQueue("root.base")
assert.Equal(t, int(queue.GetMaxResource().Resources[siCommon.Memory]), 150000000, "max resource on leaf not set correctly")
// Check the queue which will be removed
queue = part.GetQueue("root.tobedeleted")
assert.Assert(t, queue.IsRunning(), "child queue root.tobedeleted is not in running state")
configData = `
- name: default
- name: root
submitacl: "*"
- name: base
guaranteed: {memory: 500M, vcore: 250m}
max: {memory: 1G, vcore: 500m}
- name: tobeadded
something: withAvalue
- name: gpu
- name: production
- name: test
gpu: test queue property
request := si.UpdateConfigurationRequest{
RmID: "rm:123",
PolicyGroup: "policygroup",
Config: configData,
err = ms.proxy.UpdateConfiguration(&request)
assert.NilError(t, err, "configuration reload failed")
// wait until configuration is reloaded
if err = common.WaitFor(time.Second, 5*time.Second, func() bool {
return configs.ConfigContext.Get("policygroup").Checksum != configChecksum
}); err != nil {
t.Errorf("timeout waiting for configuration to be reloaded: %v", err)
// Check queues of cache and scheduler.
assert.Assert(t, part.GetTotalPartitionResource() == nil, "partition info max resource not nil, reload")
// Check the queue root
rootQ = part.GetQueue("root")
assert.Assert(t, rootQ.GetMaxResource() == nil, "root queue max resource not nil, reload")
// Check the queue root.base
queue = part.GetQueue("root.base")
assert.Equal(t, int(queue.GetMaxResource().Resources[siCommon.Memory]), 1000000000, "max resource on leaf not set correctly")
assert.Equal(t, int(queue.GetGuaranteedResource().Resources[siCommon.CPU]), 250, "guaranteed resource on leaf not set correctly")
queue = part.GetQueue("root.tobeadded")
assert.Assert(t, queue != nil, "queue root.tobeadded is not found")
// check the removed queue state
queue = part.GetQueue("root.tobedeleted")
assert.Assert(t, queue.IsDraining(), "child queue root.tobedeleted is not in draining state")
// Check queues of cache and scheduler for the newly added partition
part = ms.scheduler.GetClusterContext().GetPartition("[rm:123]gpu")
assert.Assert(t, part != nil, "gpu partition not found")
assert.Assert(t, part.GetTotalPartitionResource() == nil, "GPU partition info max resource not nil")
// Check queue root
rootQ = ms.getPartitionQueue("root", "[rm:123]gpu")
assert.Assert(t, rootQ.GetMaxResource() == nil, "max resource on root set")
// Check queue root.production
queue = ms.getPartitionQueue("root.production", "[rm:123]gpu")
assert.Assert(t, queue != nil, "New partition: queue root.production is not found")
// Test basic interactions from rm proxy to cache and to scheduler.
func TestBasicScheduler(t *testing.T) {
// Register RM
// Start all tests
ms := &mockScheduler{}
defer ms.Stop()
err := ms.Init(configDataSmokeTest, false, false)
assert.NilError(t, err, "RegisterResourceManager failed")
leafName := "root.singleleaf"
// Check queues of cache and scheduler.
part := ms.scheduler.GetClusterContext().GetPartition(partition)
assert.Assert(t, part.GetTotalPartitionResource() == nil, "partition info max resource nil")
// Check the queue root
root := part.GetQueue("root")
assert.Assert(t, root.GetMaxResource() == nil, "root queue max resource should be nil")
// Check the queue singleleaf
leaf := part.GetQueue(leafName)
assert.Equal(t, int(leaf.GetMaxResource().Resources[siCommon.Memory]), 150000000, "%s config not set correct", leafName)
// Register a node, and add apps
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
NodeID: "node-1:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
NodeID: "node-2:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
RmID: "rm:123",
assert.NilError(t, err, "NodeRequest failed")
err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
New: newAddAppRequest(map[string]string{appID1: leafName}),
RmID: "rm:123",
assert.NilError(t, err, "ApplicationRequest failed")
ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
// Get the app
app := ms.getApplication(appID1)
// Verify app initial state
var app01 *objects.Application
app01, err = getApplication(part, appID1)
assert.NilError(t, err, "application not found")
assert.Equal(t, app01.CurrentState(), objects.New.String())
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: []*si.AllocationAsk{
AllocationKey: "alloc-1a",
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
ApplicationID: appID1,
AllocationKey: "alloc-1b",
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
ApplicationID: appID1,
RmID: "rm:123",
assert.NilError(t, err, "AllocationRequest 2 failed")
// Wait pending resource of queue a and scheduler queue
// Both pending memory = 10 * 2 = 20;
waitForPendingQueueResource(t, leaf, 20000000, 1000)
waitForPendingQueueResource(t, root, 20000000, 1000)
waitForPendingAppResource(t, app, 20000000, 1000)
assert.Equal(t, app01.CurrentState(), objects.Accepted.String())
ms.mockRM.waitForAllocations(t, 2, 1000)
// Make sure pending resource updated to 0
waitForPendingQueueResource(t, leaf, 0, 1000)
waitForPendingQueueResource(t, root, 0, 1000)
waitForPendingAppResource(t, app, 0, 1000)
// Check allocated resources of queues, apps
assert.Equal(t, int(leaf.GetAllocatedResource().Resources[siCommon.Memory]), 20000000, "leaf allocated memory incorrect")
assert.Equal(t, int(root.GetAllocatedResource().Resources[siCommon.Memory]), 20000000, "root allocated memory incorrect")
assert.Equal(t, int(app.GetAllocatedResource().Resources[siCommon.Memory]), 20000000, "app allocated memory incorrect")
// once we start to process allocation asks from this app, verify the state again
assert.Equal(t, app01.CurrentState(), objects.Running.String())
// Check allocated resources of nodes
waitForAllocatedNodeResource(t, ms.scheduler.GetClusterContext(), ms.partitionName, []string{"node-1:1234", "node-2:1234"}, 20000000, 1000)
// ask for 4 more tasks
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: []*si.AllocationAsk{
AllocationKey: "alloc-2a",
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 50000000},
"vcore": {Value: 5000},
ApplicationID: appID1,
AllocationKey: "alloc-2b",
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 50000000},
"vcore": {Value: 5000},
ApplicationID: appID1,
AllocationKey: "alloc-3a",
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 5000},
ApplicationID: appID1,
AllocationKey: "alloc-3b",
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 5000},
ApplicationID: appID1,
RmID: "rm:123",
assert.NilError(t, err, "AllocationRequest 3 failed")
// Wait pending resource of queue a and scheduler queue
// Both pending memory = 50 * 2 + 100 * 2 = 300;
waitForPendingQueueResource(t, leaf, 300000000, 1000)
waitForPendingQueueResource(t, root, 300000000, 1000)
waitForPendingAppResource(t, app, 300000000, 1000)
// Now app-1 uses 20 resource, and queue-a's max = 150, so it can get two 50 container allocated.
ms.mockRM.waitForAllocations(t, 4, 1000)
// Check pending resource, should be 200 now.
waitForPendingQueueResource(t, leaf, 200000000, 1000)
waitForPendingQueueResource(t, root, 200000000, 1000)
waitForPendingAppResource(t, app, 200000000, 1000)
// Check allocated resources of queues, apps
assert.Equal(t, int(leaf.GetAllocatedResource().Resources[siCommon.Memory]), 120000000, "leaf allocated memory incorrect")
assert.Equal(t, int(root.GetAllocatedResource().Resources[siCommon.Memory]), 120000000, "root allocated memory incorrect")
assert.Equal(t, int(app.GetAllocatedResource().Resources[siCommon.Memory]), 120000000, "app allocated memory incorrect")
// Check allocated resources of nodes
waitForAllocatedNodeResource(t, ms.scheduler.GetClusterContext(), partition, []string{"node-1:1234", "node-2:1234"}, 120000000, 1000)
updateRequest := &si.AllocationRequest{
Releases: &si.AllocationReleasesRequest{
AllocationsToRelease: make([]*si.AllocationRelease, 0),
RmID: "rm:123",
// Release all allocations
for _, v := range ms.mockRM.getAllocations() {
updateRequest.Releases.AllocationsToRelease = append(updateRequest.Releases.AllocationsToRelease, &si.AllocationRelease{
AllocationKey: v.AllocationKey,
ApplicationID: v.ApplicationID,
PartitionName: v.PartitionName,
// Release Allocations.
err = ms.proxy.UpdateAllocation(updateRequest)
assert.NilError(t, err, "AllocationRequest 4 failed")
ms.mockRM.waitForAllocations(t, 0, 1000)
// Check pending resource, should be 200 (same)
waitForPendingQueueResource(t, leaf, 200000000, 1000)
waitForPendingQueueResource(t, root, 200000000, 1000)
waitForPendingAppResource(t, app, 200000000, 1000)
// Check allocated resources of queues, apps should be 0 now
assert.Equal(t, int(leaf.GetAllocatedResource().Resources[siCommon.Memory]), 0, "leaf allocated memory incorrect")
assert.Equal(t, int(root.GetAllocatedResource().Resources[siCommon.Memory]), 0, "root allocated memory incorrect")
assert.Equal(t, int(app.GetAllocatedResource().Resources[siCommon.Memory]), 0, "app allocated memory incorrect")
// Release asks
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Releases: &si.AllocationReleasesRequest{
AllocationAsksToRelease: []*si.AllocationAskRelease{
ApplicationID: appID1,
PartitionName: "default",
RmID: "rm:123",
assert.NilError(t, err, "AllocationRequest 5 failed")
// Check pending resource
waitForPendingQueueResource(t, leaf, 0, 1000)
waitForPendingQueueResource(t, root, 0, 1000)
waitForPendingAppResource(t, app, 0, 1000)
func TestBasicSchedulerAutoAllocation(t *testing.T) {
ms := &mockScheduler{}
defer ms.Stop()
err := ms.Init(configDataSmokeTest, true, false)
assert.NilError(t, err, "RegisterResourceManager failed")
leafName := "root.singleleaf"
appID := appID1
// Register a node, and add apps
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
NodeID: "node-1:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
NodeID: "node-2:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
RmID: "rm:123",
assert.NilError(t, err, "NodeRequest failed")
err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
New: newAddAppRequest(map[string]string{appID: leafName}),
RmID: "rm:123",
assert.NilError(t, err, "ApplicationRequest failed")
ms.mockRM.waitForAcceptedApplication(t, appID, 1000)
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
asks := make([]*si.AllocationAsk, 20)
for i := 0; i < 20; i++ {
asks[i] = &si.AllocationAsk{
AllocationKey: fmt.Sprintf("alloc-%d", i),
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
ApplicationID: appID,
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: asks,
RmID: "rm:123",
assert.NilError(t, err, "AllocationRequest 2 failed")
// wait until we have maxed out the leaf queue
ms.mockRM.waitForAllocations(t, 15, 1000)
// Check queue root
root := ms.getQueue("root")
// Check the leaf queue
leaf := ms.getQueue(leafName)
// Get app
app := ms.getApplication(appID)
// Make sure pending resource decreased to 50
waitForPendingQueueResource(t, leaf, 50000000, 1000)
waitForPendingQueueResource(t, root, 50000000, 1000)
waitForPendingAppResource(t, app, 50000000, 1000)
// Check allocated resources of queues, apps
assert.Equal(t, int(leaf.GetAllocatedResource().Resources[siCommon.Memory]), 150000000, "leaf allocated memory incorrect")
assert.Equal(t, int(root.GetAllocatedResource().Resources[siCommon.Memory]), 150000000, "root allocated memory incorrect")
assert.Equal(t, int(app.GetAllocatedResource().Resources[siCommon.Memory]), 150000000, "app allocated memory incorrect")
// Check allocated resources of nodes
waitForAllocatedNodeResource(t, ms.scheduler.GetClusterContext(), partition, []string{"node-1:1234", "node-2:1234"}, 150000000, 1000)
func TestFairnessAllocationForQueues(t *testing.T) {
configData := `
- name: default
- name: root
submitacl: "*"
- name: leaf1
- name: leaf2
ms := &mockScheduler{}
defer ms.Stop()
err := ms.Init(configData, false, false)
assert.NilError(t, err, "RegisterResourceManager failed")
leafApp1 := "root.leaf1"
app1ID := appID1
leafApp2 := "root.leaf2"
app2ID := appID2
// Register a node, and add apps
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
NodeID: "node-1:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
NodeID: "node-2:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
RmID: "rm:123",
assert.NilError(t, err, "NodeRequest failed")
err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
New: newAddAppRequest(map[string]string{app1ID: leafApp1, app2ID: leafApp2}),
RmID: "rm:123",
assert.NilError(t, err, "ApplicationRequest failed")
ms.mockRM.waitForAcceptedApplication(t, app1ID, 1000)
ms.mockRM.waitForAcceptedApplication(t, app2ID, 1000)
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
asks := make([]*si.AllocationAsk, 40)
appIDs := []string{app1ID, app2ID}
for i := 0; i < 40; i++ {
asks[i] = &si.AllocationAsk{
AllocationKey: fmt.Sprintf("alloc-%d", i),
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
ApplicationID: appIDs[(i / 20)],
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: asks,
RmID: "rm:123",
assert.NilError(t, err, "AllocationRequest 2 failed")
// Check the queue root
root := ms.getQueue("root")
// Check the leaf queue app1
leaf1 := ms.getQueue(leafApp1)
app1 := ms.getApplication(app1ID)
// Check the leaf queue app2
leaf2 := ms.getQueue(leafApp2)
app2 := ms.getApplication(app2ID)
// Check pending resource, should be 100 (same)
waitForPendingQueueResource(t, leaf1, 200000000, 1000)
waitForPendingQueueResource(t, leaf2, 200000000, 1000)
waitForPendingQueueResource(t, root, 400000000, 1000)
ms.mockRM.waitForAllocations(t, 20, 1500)
waitForAllocatedAppResource(t, app1, 100000000, 1000)
waitForAllocatedAppResource(t, app2, 100000000, 1000)
// Make sure pending resource updated to 0
waitForPendingQueueResource(t, leaf1, 100000000, 1000)
waitForPendingQueueResource(t, leaf2, 100000000, 1000)
waitForPendingQueueResource(t, root, 200000000, 1000)
func TestFairnessAllocationForApplications(t *testing.T) {
configData := `
- name: default
- name: root
submitacl: "*"
- name: leaf
application.sort.policy: fair
ms := &mockScheduler{}
defer ms.Stop()
err := ms.Init(configData, false, false)
assert.NilError(t, err, "RegisterResourceManager failed")
leafName := "root.leaf"
app1ID := appID1
app2ID := appID2
// Register a node, and add applications
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
NodeID: "node-1:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
NodeID: "node-2:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
RmID: "rm:123",
assert.NilError(t, err, "NodeRequest failed")
err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
New: newAddAppRequest(map[string]string{app1ID: leafName, app2ID: leafName}),
RmID: "rm:123",
assert.NilError(t, err, "ApplicationRequest failed")
ms.mockRM.waitForAcceptedApplication(t, app1ID, 1000)
ms.mockRM.waitForAcceptedApplication(t, app2ID, 1000)
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
asks := make([]*si.AllocationAsk, 40)
appIDs := []string{app1ID, app2ID}
for i := 0; i < 40; i++ {
asks[i] = &si.AllocationAsk{
AllocationKey: fmt.Sprintf("alloc-%d", i),
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
ApplicationID: appIDs[i/20],
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: asks,
RmID: "rm:123",
assert.NilError(t, err, "AllocationRequest 2 failed")
// Check the queue root
root := ms.getQueue("root")
// Check the queue a
leaf := ms.getQueue(leafName)
// Get the app
app1 := ms.getApplication(appID1)
app2 := ms.getApplication(appID2)
waitForPendingQueueResource(t, leaf, 400000000, 1000)
waitForPendingQueueResource(t, root, 400000000, 1000)
waitForPendingAppResource(t, app1, 200000000, 1000)
waitForPendingAppResource(t, app2, 200000000, 1000)
ms.mockRM.waitForAllocations(t, 20, 1000)
// Make sure pending resource updated to 100, which means
waitForPendingQueueResource(t, leaf, 200000000, 1000)
waitForPendingQueueResource(t, root, 200000000, 1000)
waitForPendingAppResource(t, app1, 100000000, 1000)
waitForPendingAppResource(t, app2, 100000000, 1000)
// Both apps got 100 resources,
assert.Equal(t, int(app1.GetAllocatedResource().Resources[siCommon.Memory]), 100000000, "app1 allocated resource incorrect")
assert.Equal(t, int(app2.GetAllocatedResource().Resources[siCommon.Memory]), 100000000, "app2 allocated resource incorrect")
func TestRejectApplications(t *testing.T) {
configData := `
- name: default
- name: root
submitacl: "*"
- name: leaf
application.sort.policy: fair
ms := &mockScheduler{}
defer ms.Stop()
err := ms.Init(configData, false, false)
assert.NilError(t, err, "RegisterResourceManager failed")
// Register a node, and add applications
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
NodeID: "node-1:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
RmID: "rm:123",
assert.NilError(t, err, "NodeRequest failed")
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
New: newAddAppRequest(map[string]string{"app-reject-1": "root.non-exist-queue"}),
RmID: "rm:123",
assert.NilError(t, err, "UpdateRequest 2 failed")
ms.mockRM.waitForRejectedApplication(t, "app-reject-1", 1000)
err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
New: newAddAppRequest(map[string]string{"app-added-2": "root.leaf"}),
RmID: "rm:123",
assert.NilError(t, err, "UpdateRequest 3 failed")
ms.mockRM.waitForAcceptedApplication(t, "app-added-2", 1000)
func TestSchedulingOverMaxCapacity(t *testing.T) {
var parameters = []struct {
name string
configData string
leafQueue string
askMemory int64
askCPU int64
numOfAsk int32
- name: default
- name: root
submitacl: "*"
- name: parent
memory: 100M
vcore: 10
- name: child-1
- name: child-2
`, "root.parent.child-1", 10000000, 1000, 12},
- name: default
- name: root
submitacl: "*"
- name: default
memory: 100M
vcore: 10
`, "root.default", 10000000, 1000, 12},
for _, param := range parameters {
t.Run(, func(t *testing.T) {
ms := &mockScheduler{}
defer ms.Stop()
err := ms.Init(param.configData, false, false)
assert.NilError(t, err, "RegisterResourceManager failed in run %s",
// Register a node, and add applications
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
NodeID: "node-1:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 150000000},
"vcore": {Value: 15000},
Action: si.NodeInfo_CREATE,
RmID: "rm:123",
assert.NilError(t, err, "NodeRequest failed in run %s",
err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
New: newAddAppRequest(map[string]string{appID1: param.leafQueue}),
RmID: "rm:123",
assert.NilError(t, err, "ApplicationRequest failed in run %s",
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
asks := make([]*si.AllocationAsk, param.numOfAsk)
for i := int32(0); i < param.numOfAsk; i++ {
asks[i] = &si.AllocationAsk{
AllocationKey: fmt.Sprintf("alloc-%d", i),
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: param.askMemory},
"vcore": {Value: param.askCPU},
ApplicationID: appID1,
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: asks,
RmID: "rm:123",
assert.NilError(t, err, "AllocationRequest 2 failed in run %s",
leaf := ms.getQueue(param.leafQueue)
waitForPendingQueueResource(t, leaf, 120000000, 1000)
// 100 memory gets allocated, 20 pending because the capacity is 100
waitForPendingQueueResource(t, leaf, 20000000, 1000)
app1 := ms.getApplication(appID1)
if app1 == nil {
t.Fatal("application 'app-1' not found in cache")
waitForAllocatedAppResource(t, app1, 100000000, 1000)
assert.Equal(t, len(app1.GetAllAllocations()), 10, "number of app allocations incorrect")
assert.Equal(t, int(app1.GetAllocatedResource().Resources[siCommon.Memory]), 100000000, "app allocated resource incorrect")
assert.Equal(t, len(ms.mockRM.getAllocations()), 10, "number of RM allocations incorrect")
// release all allocated allocations
allocReleases := make([]*si.AllocationRelease, 0)
for _, alloc := range ms.mockRM.getAllocations() {
allocReleases = append(allocReleases, &si.AllocationRelease{
PartitionName: "default",
ApplicationID: appID1,
AllocationKey: alloc.AllocationKey,
Message: "",
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Releases: &si.AllocationReleasesRequest{
AllocationsToRelease: allocReleases,
RmID: "rm:123",
assert.NilError(t, err, "AllocationRequest 3 failed in run %s",
waitForAllocatedQueueResource(t, leaf, 0, 1000)
// schedule again, pending requests should be satisfied now
waitForPendingQueueResource(t, leaf, 0, 1000)
ms.mockRM.waitForAllocations(t, 2, 1000)
assert.Equal(t, len(ms.mockRM.getAllocations()), 2)
// Test basic interactions from rm proxy to cache and to scheduler.
func TestRMNodeActions(t *testing.T) {
configData := `
- name: default
- name: root
submitacl: "*"
ms := &mockScheduler{}
defer ms.Stop()
err := ms.Init(configData, false, false)
assert.NilError(t, err, "RegisterResourceManager failed")
node1ID := "node-1:1234"
node2ID := "node-2:1234"
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
NodeID: node1ID,
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
NodeID: node2ID,
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
RmID: "rm:123",
assert.NilError(t, err, "NodeRequest failed")
ms.mockRM.waitForAcceptedNode(t, node1ID, 1000)
ms.mockRM.waitForAcceptedNode(t, node2ID, 1000)
// verify the nodes
context := ms.scheduler.GetClusterContext()
waitForNewNode(t, context, node1ID, partition, 1000)
waitForNewNode(t, context, node2ID, partition, 1000)
// verify all nodes are schedule-able
assert.Equal(t, ms.scheduler.GetClusterContext().GetPartition(partition).GetNode(node1ID).IsSchedulable(), true)
assert.Equal(t, ms.scheduler.GetClusterContext().GetPartition(partition).GetNode(node2ID).IsSchedulable(), true)
// send RM node action DRAIN_NODE (move to unschedulable)
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
NodeID: node1ID,
Action: si.NodeInfo_DRAIN_NODE,
RmID: "rm:123",
assert.NilError(t, err, "NodeRequest 2 failed")
err = common.WaitFor(10*time.Millisecond, 10*time.Second, func() bool {
return !ms.scheduler.GetClusterContext().GetPartition(partition).GetNode(node1ID).IsSchedulable() &&
assert.NilError(t, err, "timed out waiting for node in cache")
// send RM node action: DRAIN_TO_SCHEDULABLE (make it schedulable again)
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
NodeID: node1ID,
RmID: "rm:123",
assert.NilError(t, err, "NodeRequest 3 failed")
err = common.WaitFor(10*time.Millisecond, 10*time.Second, func() bool {
return ms.scheduler.GetClusterContext().GetPartition(partition).GetNode(node1ID).IsSchedulable() &&
assert.NilError(t, err, "timed out waiting for node in cache")
// send RM node action: DECOMMISSION (make it unschedulable and tell partition to delete)
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
NodeID: node2ID,
Action: si.NodeInfo_DECOMISSION,
RmID: "rm:123",
assert.NilError(t, err, "NodeRequest 3 failed")
// node removal can be really quick: cannot test for unschedulable state (nil pointer)
// verify that the node (node-2) was removed
waitForRemovedNode(t, context, node2ID, partition, 10000)
func TestBinPackingAllocationForApplications(t *testing.T) {
configData := `
name: default
type: binpacking
- name: root
submitacl: "*"
- name: leaf
ms := &mockScheduler{}
defer ms.Stop()
err := ms.Init(configData, false, false)
assert.NilError(t, err, "RegisterResourceManager failed")
leafName := "root.leaf"
app1ID := appID1
app2ID := appID2
// Register a node, and add applications
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
NodeID: "node-1:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
NodeID: "node-2:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
RmID: "rm:123",
assert.NilError(t, err, "NodeRequest failed")
err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
New: newAddAppRequest(map[string]string{app1ID: leafName, app2ID: leafName}),
RmID: "rm:123",
assert.NilError(t, err, "ApplicationRequest failed")
ms.mockRM.waitForAcceptedApplication(t, app1ID, 1000)
ms.mockRM.waitForAcceptedApplication(t, app2ID, 1000)
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
asks := make([]*si.AllocationAsk, 40)
appIDs := []string{app1ID, app2ID}
for i := 0; i < 40; i++ {
asks[i] = &si.AllocationAsk{
AllocationKey: fmt.Sprintf("alloc-%d", i),
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
ApplicationID: appIDs[i/20],
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: asks,
RmID: "rm:123",
assert.NilError(t, err, "UpdateRequest 2 failed")
// Check the queue root
root := ms.getQueue("root")
// Check the queue a
leaf := ms.getQueue(leafName)
// Get the app
app1 := ms.getApplication(app1ID)
app2 := ms.getApplication(app2ID)
waitForPendingQueueResource(t, leaf, 400000000, 1000)
waitForPendingQueueResource(t, root, 400000000, 1000)
waitForPendingAppResource(t, app1, 200000000, 1000)
waitForPendingAppResource(t, app2, 200000000, 1000)
ms.mockRM.waitForAllocations(t, 9, 1000)
node1Alloc := ms.scheduler.GetClusterContext().GetPartition(partition).GetNode("node-1:1234").GetAllocatedResource().Resources[siCommon.Memory]
node2Alloc := ms.scheduler.GetClusterContext().GetPartition(partition).GetNode("node-2:1234").GetAllocatedResource().Resources[siCommon.Memory]
// we do not know which node was chosen so we need to check:
// node1 == 90 && node2 == 0 || node1 == 0 && node2 == 90
if !(node1Alloc == 90000000 && node2Alloc == 0) && !(node1Alloc == 0 && node2Alloc == 90000000) {
t.Errorf("allocation not contained on one: node1 = %d, node2 = %d", node1Alloc, node2Alloc)
func TestFairnessAllocationForNodes(t *testing.T) {
configData := `
- name: default
- name: root
submitacl: "*"
- name: leaf
ms := &mockScheduler{}
defer ms.Stop()
err := ms.Init(configData, false, false)
assert.NilError(t, err, "RegisterResourceManager failed")
// Register 10 nodes, and add applications
nodes := make([]*si.NodeInfo, 0)
for i := 0; i < 10; i++ {
nodeID := "node-" + strconv.Itoa(i)
nodes = append(nodes, &si.NodeInfo{
NodeID: nodeID + ":1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
leafName := "root.leaf"
appID := appID1
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: nodes,
RmID: "rm:123",
assert.NilError(t, err, "NodeRequest failed")
err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
New: newAddAppRequest(map[string]string{appID: leafName}),
RmID: "rm:123",
assert.NilError(t, err, "ApplicationRequest failed")
// verify app and all nodes are accepted
ms.mockRM.waitForAcceptedApplication(t, appID, 1000)
for _, node := range nodes {
ms.mockRM.waitForAcceptedNode(t, node.NodeID, 1000)
context := ms.scheduler.GetClusterContext()
for _, node := range nodes {
waitForNewNode(t, context, node.NodeID, partition, 1000)
// Request 20 allocations
asks := make([]*si.AllocationAsk, 20)
for i := 0; i < 20; i++ {
asks[i] = &si.AllocationAsk{
AllocationKey: fmt.Sprintf("alloc-%d", i),
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
ApplicationID: appID,
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: asks,
RmID: "rm:123",
assert.NilError(t, err, "AllocationRequest failed")
leaf := ms.getQueue(leafName)
app := ms.getApplication(appID)
waitForPendingQueueResource(t, leaf, 200000000, 1000)
waitForPendingAppResource(t, app, 200000000, 1000)
// Verify all requests are satisfied
ms.mockRM.waitForAllocations(t, 20, 1000)
waitForPendingQueueResource(t, leaf, 0, 1000)
waitForPendingAppResource(t, app, 0, 1000)
assert.Equal(t, int(app.GetAllocatedResource().Resources[siCommon.Memory]), 200000000)
// Verify 2 allocations for every node
for _, node := range nodes {
node := ms.scheduler.GetClusterContext().GetNode(node.NodeID, partition)
assert.Equal(t, int(node.GetAllocatedResource().Resources[siCommon.Memory]), 20000000, "node %s did not get 2 allocated", node.NodeID)
// this test cases covers a basic gang scheduling scenario, where an app has
// 1 member in the gang and 1 actual request, it verifies each step of the
// placeholder replacement works as expected.
// it does an extra verification at the end, by simulating a dup release allocation
// request, ensure this won't cause any NPE issue.
// nolint: funlen
func TestDupReleasesInGangScheduling(t *testing.T) {
// Register RM
// Start all tests
ms := &mockScheduler{}
defer ms.Stop()
err := ms.Init(configDataSmokeTest, false, false)
assert.NilError(t, err, "RegisterResourceManager failed")
leafName := "root.singleleaf"
part := ms.scheduler.GetClusterContext().GetPartition(partition)
root := part.GetQueue("root")
leaf := part.GetQueue(leafName)
// Register a node, and add apps
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
NodeID: "node-1:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
NodeID: "node-2:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
RmID: "rm:123",
assert.NilError(t, err)
err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
New: newAddAppRequest(map[string]string{appID1: leafName}),
RmID: "rm:123",
assert.NilError(t, err)
ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
// Get the app
app := ms.getApplication(appID1)
// Verify app initial state
var app01 *objects.Application
app01, err = getApplication(part, appID1)
assert.NilError(t, err, "application not found")
assert.Equal(t, app01.CurrentState(), objects.New.String())
// shim side creates a placeholder, and send the UpdateRequest
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: []*si.AllocationAsk{
AllocationKey: "alloc-1",
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
TaskGroupName: "tg",
Placeholder: true,
ApplicationID: appID1,
RmID: "rm:123",
assert.NilError(t, err, "AllocationRequest failed")
// schedule and make sure the placeholder gets allocated
ms.mockRM.waitForAllocations(t, 1, 1000)
// verify the placeholder allocation
assert.Equal(t, len(app01.GetAllAllocations()), 1)
assert.Equal(t, int(app.GetPlaceholderResource().Resources[siCommon.Memory]), 10000000,
"app allocated memory incorrect")
placeholderAlloc := app01.GetAllAllocations()[0]
// Check allocated resources of nodes
waitForAllocatedNodeResource(t, ms.scheduler.GetClusterContext(), ms.partitionName,
[]string{"node-1:1234", "node-2:1234"}, 10000000, 1000)
// shim submits the actual pod for scheduling
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: []*si.AllocationAsk{
AllocationKey: "alloc-2",
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
ApplicationID: appID1,
Placeholder: false,
TaskGroupName: "tg",
RmID: "rm:123",
assert.NilError(t, err, "AllocationRequest 3 failed")
// schedule and this triggers a placeholder REPLACE
// the core releases the placeholder allocation,
// and it waits for the shim's confirmation
ms.mockRM.waitForAllocations(t, 0, 1000)
// shim responses the allocation has been released
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Releases: &si.AllocationReleasesRequest{
AllocationsToRelease: []*si.AllocationRelease{
PartitionName: "default",
ApplicationID: appID1,
AllocationKey: placeholderAlloc.GetAllocationKey(),
TerminationType: si.TerminationType_PLACEHOLDER_REPLACED,
RmID: "rm:123",
assert.NilError(t, err)
// schedule and verify the actual request gets allocated
ms.mockRM.waitForAllocations(t, 1, 1000)
// actual request gets allocated
// placeholder requests have been all released
// and queue has no pending resources
assert.Equal(t, int(app.GetAllocatedResource().Resources[siCommon.Memory]), 10000000)
assert.Equal(t, int(app.GetPlaceholderResource().Resources[siCommon.Memory]), 0,
"app allocated memory incorrect")
waitForPendingQueueResource(t, leaf, 0, 1000)
waitForPendingQueueResource(t, root, 0, 1000)
// simulate the shim sends the release request again
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Releases: &si.AllocationReleasesRequest{
AllocationsToRelease: []*si.AllocationRelease{
PartitionName: "default",
ApplicationID: appID1,
AllocationKey: placeholderAlloc.GetAllocationKey(),
TerminationType: si.TerminationType_PLACEHOLDER_REPLACED,
RmID: "rm:123",
assert.NilError(t, err)
ms.mockRM.waitForAllocations(t, 1, 1000)
func TestDynamicQueueCleanUp(t *testing.T) {
configData := `
- name: default
- name: root
submitacl: "*"
- name: fixed
value: cleanup_test
create: true
// Register RM
// Start all tests
ms := &mockScheduler{}
defer ms.Stop()
err := ms.Init(configData, false, false)
assert.NilError(t, err, "RegisterResourceManager failed")
// Check queues of cache and scheduler.
part := ms.scheduler.GetClusterContext().GetPartition(partition)
assert.Assert(t, part.GetTotalPartitionResource() == nil, "partition info max resource nil")
// Check the queue root
root := part.GetQueue("root")
assert.Assert(t, root.GetMaxResource() == nil, "root queue max resource should be nil")
leafName := ""
app1ID := appID1
// Register a node, and add apps
err = ms.proxy.UpdateNode(&si.NodeRequest{
Nodes: []*si.NodeInfo{
NodeID: "node-1:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
NodeID: "node-2:1234",
Attributes: map[string]string{},
SchedulableResource: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 100000000},
"vcore": {Value: 20000},
Action: si.NodeInfo_CREATE,
RmID: "rm:123",
assert.NilError(t, err, "NodeRequest failed")
err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
New: newAddAppRequest(map[string]string{app1ID: leafName}),
RmID: "rm:123",
assert.NilError(t, err, "ApplicationRequest failed")
ms.mockRM.waitForAcceptedApplication(t, app1ID, 1000)
ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
// Get the app
app := ms.getApplication(appID1)
// Get the queue cleanup_test
leafName = "root.cleanup_test"
leaf := part.GetQueue(leafName)
// Verify app initial state
var app01 *objects.Application
app01, err = getApplication(part, appID1)
assert.NilError(t, err, "application not found")
assert.Equal(t, app01.CurrentState(), objects.New.String())
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Asks: []*si.AllocationAsk{
AllocationKey: "alloc-1",
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
ApplicationID: appID1,
AllocationKey: "alloc-2",
ResourceAsk: &si.Resource{
Resources: map[string]*si.Quantity{
"memory": {Value: 10000000},
"vcore": {Value: 1000},
ApplicationID: appID1,
RmID: "rm:123",
assert.NilError(t, err, "AllocationRequest 2 failed")
// Wait pending resource of queue a and scheduler queue
// Both pending memory = 10 * 2 = 20;
waitForPendingQueueResource(t, leaf, 20000000, 1000)
waitForPendingQueueResource(t, root, 20000000, 1000)
waitForPendingAppResource(t, app, 20000000, 1000)
assert.Equal(t, app01.CurrentState(), objects.Accepted.String())
ms.mockRM.waitForAllocations(t, 2, 1000)
// Make sure pending resource updated to 0
waitForPendingQueueResource(t, leaf, 0, 1000)
waitForPendingQueueResource(t, root, 0, 1000)
waitForPendingAppResource(t, app, 0, 1000)
// Check allocated resources of queues, apps
assert.Equal(t, int(leaf.GetAllocatedResource().Resources[siCommon.Memory]), 20000000, "leaf allocated memory incorrect")
assert.Equal(t, int(root.GetAllocatedResource().Resources[siCommon.Memory]), 20000000, "root allocated memory incorrect")
assert.Equal(t, int(app.GetAllocatedResource().Resources[siCommon.Memory]), 20000000, "app allocated memory incorrect")
// once we start to process allocation asks from this app, verify the state again
assert.Equal(t, app01.CurrentState(), objects.Running.String())
// Check allocated resources of nodes
waitForAllocatedNodeResource(t, ms.scheduler.GetClusterContext(), ms.partitionName, []string{"node-1:1234", "node-2:1234"}, 20000000, 1000)
updateRequest := &si.AllocationRequest{
Releases: &si.AllocationReleasesRequest{
AllocationsToRelease: make([]*si.AllocationRelease, 0),
RmID: "rm:123",
// Release all allocations
for _, v := range ms.mockRM.getAllocations() {
updateRequest.Releases.AllocationsToRelease = append(updateRequest.Releases.AllocationsToRelease, &si.AllocationRelease{
AllocationKey: v.AllocationKey,
ApplicationID: v.ApplicationID,
PartitionName: v.PartitionName,
// Before release allocations, shorten the completingTimeout first, otherwise it will take 30 seconds for the app to become completed state.
objects.SetCompletingTimeout(time.Millisecond * 100)
defer objects.SetCompletingTimeout(time.Second * 30)
// Release Allocations.
err = ms.proxy.UpdateAllocation(updateRequest)
assert.NilError(t, err, "AllocationRequest 3 failed")
ms.mockRM.waitForAllocations(t, 0, 1000)
// Check allocated resources of queues, apps should be 0 now
assert.Equal(t, int(leaf.GetAllocatedResource().Resources[siCommon.Memory]), 0, "leaf allocated memory incorrect")
assert.Equal(t, int(root.GetAllocatedResource().Resources[siCommon.Memory]), 0, "root allocated memory incorrect")
assert.Equal(t, int(app.GetAllocatedResource().Resources[siCommon.Memory]), 0, "app allocated memory incorrect")
// Check app to Completing status
assert.Equal(t, app01.CurrentState(), objects.Completing.String())
// the app changes from completing state to completed state
err = common.WaitFor(1*time.Millisecond, time.Millisecond*200, app.IsCompleted)
assert.NilError(t, err, "App should be in Completed state")
// partition manager should be able to clean up the dynamically created queue.
if err = common.WaitFor(1*time.Millisecond, time.Second*11, func() bool {
return part.GetQueue(leafName) == nil
}); err != nil {
t.Errorf("timeout waiting for queue is cleared %v", err)