[YUNIKORN-1697][core] Make namespace annotation to support max applications update. (#927)
fix golint
Address new comments
Fix test
fix logic
Increase test coverage
Split testing
Fix unit test
fix
increase coverage
Merge remote-tracking branch 'upstream/master' into YUNIKORN-1697
Add test coverage
Address new comments
Merge remote-tracking branch 'upstream/master' into YUNIKORN-1697
Address new comments
Merge remote-tracking branch 'upstream/master' into YUNIKORN-1697
Closes: #927
Signed-off-by: qzhu <qzhu@cloudera.com>
diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go
index 504b6b0..350abf3 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -22,6 +22,7 @@
"context"
"fmt"
"math"
+ "strconv"
"strings"
"sync"
"time"
@@ -2157,6 +2158,23 @@
return sa.getResourceFromTags(siCommon.AppTagNamespaceResourceQuota)
}
+// GetMaxApps returns the max apps that is set in the application tags
+func (sa *Application) GetMaxApps() uint64 {
+ return sa.getUint64Tag(siCommon.AppTagNamespaceResourceMaxApps)
+}
+
+func (sa *Application) getUint64Tag(tag string) uint64 {
+ uintValue, err := strconv.ParseUint(sa.GetTag(tag), 10, 64)
+ if err != nil {
+ log.Log(log.SchedApplication).Warn("application tag conversion failure",
+ zap.String("tag", tag),
+ zap.String("json string", sa.GetTag(tag)),
+ zap.Error(err))
+ return 0
+ }
+ return uintValue
+}
+
func (sa *Application) getResourceFromTags(tag string) *resources.Resource {
value := sa.GetTag(tag)
if value == "" {
diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go
index 3d2008f..9dfcdc5 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -108,46 +108,120 @@
assert.Assert(t, app.IsNew(), "new application must be in new state")
assert.Equal(t, app.execTimeout, defaultPlaceholderTimeout, "no timeout passed in should be modified default")
assert.Assert(t, resources.Equals(app.placeholderAsk, res), "placeholder ask not set as expected")
+}
+func TestNewApplicationWithAnnotationUpdate(t *testing.T) {
+ user := security.UserGroup{
+ User: "testuser",
+ Groups: []string{},
+ }
// valid tags
- siApp = &si.AddApplicationRequest{}
+ siApp := &si.AddApplicationRequest{}
siApp.Tags = map[string]string{
siCommon.AppTagNamespaceResourceQuota: "{\"resources\":{\"validMaxRes\":{\"value\":11}}}",
siCommon.AppTagNamespaceResourceGuaranteed: "{\"resources\":{\"validGuaranteed\":{\"value\":22}}}",
+ siCommon.AppTagNamespaceResourceMaxApps: "33",
}
- app = NewApplication(siApp, user, nil, "")
+
+ app := NewApplication(siApp, user, nil, "")
+
guaranteed := app.GetGuaranteedResource()
maxResource := app.GetMaxResource()
+ maxApps := app.GetMaxApps()
assert.Assert(t, guaranteed != nil, "guaranteed resource has not been set")
assert.Equal(t, 1, len(guaranteed.Resources), "more than one resource has been set")
assert.Equal(t, resources.Quantity(22), guaranteed.Resources["validGuaranteed"])
assert.Assert(t, maxResource != nil, "maximum resource has not been set")
assert.Equal(t, 1, len(maxResource.Resources), "more than one resource has been set")
assert.Equal(t, resources.Quantity(11), maxResource.Resources["validMaxRes"], "maximum resource is incorrect")
+ assert.Assert(t, maxApps != 0, "maximum apps has not been set or incorrect")
+ assert.Equal(t, uint64(33), maxApps, "maximum apps is incorrect")
+
+ // valid tags without max apps
+ siApp = &si.AddApplicationRequest{}
+ siApp.Tags = map[string]string{
+ siCommon.AppTagNamespaceResourceQuota: "{\"resources\":{\"validMaxRes\":{\"value\":11}}}",
+ siCommon.AppTagNamespaceResourceGuaranteed: "{\"resources\":{\"validGuaranteed\":{\"value\":22}}}",
+ }
+ app = NewApplication(siApp, user, nil, "")
+ guaranteed = app.GetGuaranteedResource()
+ maxResource = app.GetMaxResource()
+ maxApps = app.GetMaxApps()
+ assert.Assert(t, guaranteed != nil, "guaranteed resource has not been set")
+ assert.Equal(t, 1, len(guaranteed.Resources), "more than one resource has been set")
+ assert.Equal(t, resources.Quantity(22), guaranteed.Resources["validGuaranteed"])
+ assert.Assert(t, maxResource != nil, "maximum resource has not been set")
+ assert.Equal(t, 1, len(maxResource.Resources), "more than one resource has been set")
+ assert.Equal(t, resources.Quantity(11), maxResource.Resources["validMaxRes"], "maximum resource is incorrect")
+ assert.Assert(t, maxApps == 0, "maximum apps should have not been set")
// invalid tags
siApp = &si.AddApplicationRequest{}
siApp.Tags = map[string]string{
siCommon.AppTagNamespaceResourceQuota: "{xxxxxx}",
siCommon.AppTagNamespaceResourceGuaranteed: "{yyyyy}",
+ siCommon.AppTagNamespaceResourceMaxApps: "zzzzz",
}
app = NewApplication(siApp, user, nil, "")
guaranteed = app.GetGuaranteedResource()
maxResource = app.GetMaxResource()
+ maxApps = app.GetMaxApps()
assert.Assert(t, guaranteed == nil, "guaranteed resource should have not been set")
assert.Assert(t, maxResource == nil, "maximum resource should have not been set")
+ assert.Assert(t, maxApps == 0, "maximum apps should have not been set or incorrect")
// negative values
siApp = &si.AddApplicationRequest{}
siApp.Tags = map[string]string{
siCommon.AppTagNamespaceResourceQuota: "{\"resources\":{\"negativeMax\":{\"value\":-11}}}",
siCommon.AppTagNamespaceResourceGuaranteed: "{\"resources\":{\"negativeGuaranteed\":{\"value\":-22}}}",
+ siCommon.AppTagNamespaceResourceMaxApps: "-33",
}
app = NewApplication(siApp, user, nil, "")
guaranteed = app.GetGuaranteedResource()
maxResource = app.GetMaxResource()
+ maxApps = app.GetMaxApps()
assert.Assert(t, guaranteed == nil, "guaranteed resource should have not been set")
assert.Assert(t, maxResource == nil, "maximum resource should have not been set")
+ assert.Assert(t, maxApps == 0, "maximum apps should have not been set or incorrect")
+
+ // valid max apps
+ siApp = &si.AddApplicationRequest{}
+ siApp.Tags = map[string]string{
+ siCommon.AppTagNamespaceResourceMaxApps: "33",
+ }
+ app = NewApplication(siApp, user, nil, "")
+ maxApps = app.GetMaxApps()
+ guaranteed = app.GetGuaranteedResource()
+ maxResource = app.GetMaxResource()
+ assert.Assert(t, guaranteed == nil, "guaranteed resource should have not been set")
+ assert.Assert(t, maxResource == nil, "maximum resource should have not been set")
+ assert.Assert(t, maxApps != 0, "maximum apps has not been set or incorrect")
+ assert.Equal(t, uint64(33), maxApps, "maximum apps is incorrect")
+
+ // invalid max apps
+ siApp = &si.AddApplicationRequest{}
+ siApp.Tags = map[string]string{
+ siCommon.AppTagNamespaceResourceMaxApps: "zzzzz",
+ }
+ app = NewApplication(siApp, user, nil, "")
+ maxApps = app.GetMaxApps()
+ maxResource = app.GetMaxResource()
+ assert.Assert(t, guaranteed == nil, "guaranteed resource should have not been set")
+ assert.Assert(t, maxResource == nil, "maximum resource should have not been set")
+ assert.Assert(t, maxApps == 0, "maximum apps should have not been set or incorrect")
+
+ // negative max apps
+ siApp = &si.AddApplicationRequest{}
+ siApp.Tags = map[string]string{
+ siCommon.AppTagNamespaceResourceMaxApps: "-33",
+ }
+ app = NewApplication(siApp, user, nil, "")
+ maxApps = app.GetMaxApps()
+ maxResource = app.GetMaxResource()
+ assert.Assert(t, guaranteed == nil, "guaranteed resource should have not been set")
+ assert.Assert(t, maxResource == nil, "maximum resource should have not been set")
+ assert.Assert(t, maxApps == 0, "maximum apps should have not been set or incorrect")
}
// test basic reservations
@@ -2799,3 +2873,33 @@
defer sa.Unlock()
sa.appEvents = schedEvt.NewApplicationEvents(events.GetEventSystem())
}
+
+func TestGetUint64Tag(t *testing.T) {
+ app := &Application{
+ tags: map[string]string{
+ "validUintTag": "12345",
+ "negativeUintTag": "-12345",
+ "invalidUintTag": "not-a-number",
+ "emptyUintTag": "",
+ },
+ }
+
+ tests := []struct {
+ name string
+ tag string
+ expected uint64
+ }{
+ {"Valid uint64 tag", "validUintTag", uint64(12345)},
+ {"Negative uint64 tag", "negativeUintTag", uint64(0)},
+ {"Invalid uint64 tag", "invalidUintTag", uint64(0)},
+ {"Empty tag", "emptyUintTag", uint64(0)},
+ {"Non-existent tag", "nonExistentTag", uint64(0)},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := app.getUint64Tag(tt.tag)
+ assert.Equal(t, tt.expected, result)
+ })
+ }
+}
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 71b12e8..cb11c90 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -388,7 +388,6 @@
zap.Error(err))
return err
}
-
sq.setResources(guaranteedResource, maxResource)
return nil
}
@@ -453,6 +452,16 @@
sq.setResources(guaranteedResource, maxResource)
}
+// SetMaxRunningApps allows setting the maximum running apps on a queue
+func (sq *Queue) SetMaxRunningApps(maxApps uint64) {
+ if sq == nil {
+ return
+ }
+ sq.Lock()
+ defer sq.Unlock()
+ sq.maxRunningApps = maxApps
+}
+
// setTemplate sets the template on the queue based on the config.
// lock free call, must be called holding the queue lock or during create only
func (sq *Queue) setTemplate(conf configs.ChildTemplate) error {
@@ -601,7 +610,14 @@
return sq.guaranteedResource
}
-// GetActualGuaranteedResource returns the actual (including parent) guaranteed resources for the queue.
+// GetMaxApps returns the maximum number of applications that can run in this queue.
+func (sq *Queue) GetMaxApps() uint64 {
+ sq.RLock()
+ defer sq.RUnlock()
+ return sq.maxRunningApps
+}
+
+// GetActualGuaranteedResources returns the actual (including parent) guaranteed resources for the queue.
func (sq *Queue) GetActualGuaranteedResource() *resources.Resource {
if sq == nil {
return resources.NewResource()
@@ -1683,20 +1699,6 @@
return sq.preemptionPolicy
}
-// SetMaxRunningApps allows setting the maximum running apps on a queue
-// test only
-func (sq *Queue) SetMaxRunningApps(max int) {
- if sq == nil {
- return
- }
- if max < 0 {
- return
- }
- sq.Lock()
- defer sq.Unlock()
- sq.maxRunningApps = uint64(max)
-}
-
// FindEligiblePreemptionVictims is used to locate tasks which may be preempted for the given ask.
// queuePath is the fully-qualified path of the queue where ask resides
// ask is the ask we are attempting to preempt for
diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go
index cd0700c..c0792d5 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -2611,6 +2611,22 @@
}
}
+func TestQueueSetMaxRunningApps(t *testing.T) {
+ defer func() {
+ if r := recover(); r != nil {
+ t.Fatal("panic on nil queue setMaxRunningApps")
+ }
+ }()
+ queue := &Queue{}
+ maxApps := uint64(10)
+
+ queue.SetMaxRunningApps(maxApps)
+ assert.Equal(t, maxApps, queue.maxRunningApps)
+
+ queue = nil
+ queue.SetMaxRunningApps(maxApps)
+}
+
func TestQueue_allocatedResFits_Other(t *testing.T) {
const first = "first"
const second = "second"
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index a614ee4..3a26182 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -342,13 +342,19 @@
guaranteedRes := app.GetGuaranteedResource()
maxRes := app.GetMaxResource()
- if !isRecoveryQueue && (guaranteedRes != nil || maxRes != nil) {
+ maxApps := app.GetMaxApps()
+ if !isRecoveryQueue && (guaranteedRes != nil || maxRes != nil || maxApps != 0) {
// set resources based on tags, but only if the queue is dynamic (unmanaged)
if queue.IsManaged() {
log.Log(log.SchedQueue).Warn("Trying to set resources on a queue that is not an unmanaged leaf",
zap.String("queueName", queue.QueuePath))
} else {
- queue.SetResources(guaranteedRes, maxRes)
+ if maxApps != 0 {
+ queue.SetMaxRunningApps(maxApps)
+ }
+ if guaranteedRes != nil || maxRes != nil {
+ queue.SetResources(guaranteedRes, maxRes)
+ }
}
}
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index f138d23..24f51fa 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -2808,6 +2808,7 @@
tags := map[string]string{
siCommon.AppTagNamespaceResourceGuaranteed: "{\"resources\":{\"vcore\":{\"value\":111}}}",
siCommon.AppTagNamespaceResourceQuota: "{\"resources\":{\"vcore\":{\"value\":2222}}}",
+ siCommon.AppTagNamespaceResourceMaxApps: "1",
}
app := newApplicationTGTags(appID1, "default", "root.limited", tgRes, tags)
err = partition.AddApplication(app)
@@ -2819,6 +2820,7 @@
"vcore": 1000,
})), "max resource changed unexpectedly")
assert.Assert(t, queue.GetGuaranteedResource() == nil)
+ assert.Equal(t, queue.GetMaxApps(), uint64(2), "max running apps should be 2")
// add a app with TG that does fit in the queue
limit = map[string]string{"vcore": "100"}
@@ -2832,6 +2834,7 @@
"vcore": 100000,
})), "max resource changed unexpectedly")
assert.Assert(t, queue.GetGuaranteedResource() == nil)
+ assert.Equal(t, queue.GetMaxApps(), uint64(2), "max running apps should be 2")
// add a app with TG that does fit in the queue as the resource is not limited in the queue
limit = map[string]string{"second": "100"}
@@ -2845,6 +2848,7 @@
"second": 100,
})), "max resource changed unexpectedly")
assert.Assert(t, queue.GetGuaranteedResource() == nil)
+ assert.Equal(t, queue.GetMaxApps(), uint64(2), "max running apps should be 2")
}
func TestAddTGAppDynamic(t *testing.T) {
@@ -2861,7 +2865,11 @@
assert.Equal(t, app.GetQueuePath(), "root.unlimited", "app-1 not placed in expected queue")
jsonMaxRes := "{\"resources\":{\"vcore\":{\"value\":10000}}}"
- tags = map[string]string{"taskqueue": "same", siCommon.AppTagNamespaceResourceQuota: jsonMaxRes}
+ tags = map[string]string{
+ "taskqueue": "same",
+ siCommon.AppTagNamespaceResourceQuota: jsonMaxRes,
+ siCommon.AppTagNamespaceResourceMaxApps: "1",
+ }
app = newApplicationTGTags(appID2, "default", "unknown", tgRes, tags)
err = partition.AddApplication(app)
assert.NilError(t, err, "app-2 should have been added to the partition")
@@ -2874,10 +2882,16 @@
assert.Assert(t, resources.Equals(maxRes, resources.NewResourceFromMap(map[string]resources.Quantity{
"vcore": 10000,
})), "max resource set on the queue does not match the JSON tag")
+ assert.Equal(t, queue.GetMaxApps(), uint64(1), "max running apps should be 1")
jsonMaxRes = "{\"resources\":{\"vcore\":{\"value\":1000}}}"
jsonGuaranteedRes := "{\"resources\":{\"vcore\":{\"value\":111}}}"
- tags = map[string]string{"taskqueue": "smaller", siCommon.AppTagNamespaceResourceQuota: jsonMaxRes, siCommon.AppTagNamespaceResourceGuaranteed: jsonGuaranteedRes}
+ tags = map[string]string{
+ "taskqueue": "smaller",
+ siCommon.AppTagNamespaceResourceQuota: jsonMaxRes,
+ siCommon.AppTagNamespaceResourceGuaranteed: jsonGuaranteedRes,
+ siCommon.AppTagNamespaceResourceMaxApps: "1",
+ }
app = newApplicationTGTags(appID3, "default", "unknown", tgRes, tags)
err = partition.AddApplication(app)
if err == nil {
@@ -2900,6 +2914,7 @@
assert.Assert(t, resources.Equals(guaranteedRes, resources.NewResourceFromMap(map[string]resources.Quantity{
"vcore": 111,
})), "guaranteed resource set on the queue does not match the JSON tag")
+ assert.Equal(t, queue.GetMaxApps(), uint64(1), "max running apps should be 1")
}
func TestPlaceholderSmallerThanReal(t *testing.T) {
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index 96e6d8e..ea1192c 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -392,6 +392,7 @@
Resources: configs.Resources{
Max: resLimit,
},
+ MaxApplications: 2,
Limits: []configs.Limit{
{
Limit: "limited queue limit",