[YUNIKORN-1697][core] Make namespace annotation to support max applications update. (#927)

fix golint
Address new comments
Fix test
fix logic
Increase test coverage
Split testing
Fix unit test
fix
increase coverage
Merge remote-tracking branch 'upstream/master' into YUNIKORN-1697
Add test coverage
Address new comments
Merge remote-tracking branch 'upstream/master' into YUNIKORN-1697
Address new comments
Merge remote-tracking branch 'upstream/master' into YUNIKORN-1697

Closes: #927

Signed-off-by: qzhu <qzhu@cloudera.com>
diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go
index 504b6b0..350abf3 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -22,6 +22,7 @@
 	"context"
 	"fmt"
 	"math"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -2157,6 +2158,23 @@
 	return sa.getResourceFromTags(siCommon.AppTagNamespaceResourceQuota)
 }
 
+// GetMaxApps returns the max apps that is set in the application tags
+func (sa *Application) GetMaxApps() uint64 {
+	return sa.getUint64Tag(siCommon.AppTagNamespaceResourceMaxApps)
+}
+
+func (sa *Application) getUint64Tag(tag string) uint64 {
+	uintValue, err := strconv.ParseUint(sa.GetTag(tag), 10, 64)
+	if err != nil {
+		log.Log(log.SchedApplication).Warn("application tag conversion failure",
+			zap.String("tag", tag),
+			zap.String("json string", sa.GetTag(tag)),
+			zap.Error(err))
+		return 0
+	}
+	return uintValue
+}
+
 func (sa *Application) getResourceFromTags(tag string) *resources.Resource {
 	value := sa.GetTag(tag)
 	if value == "" {
diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go
index 3d2008f..9dfcdc5 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -108,46 +108,120 @@
 	assert.Assert(t, app.IsNew(), "new application must be in new state")
 	assert.Equal(t, app.execTimeout, defaultPlaceholderTimeout, "no timeout passed in should be modified default")
 	assert.Assert(t, resources.Equals(app.placeholderAsk, res), "placeholder ask not set as expected")
+}
 
+func TestNewApplicationWithAnnotationUpdate(t *testing.T) {
+	user := security.UserGroup{
+		User:   "testuser",
+		Groups: []string{},
+	}
 	// valid tags
-	siApp = &si.AddApplicationRequest{}
+	siApp := &si.AddApplicationRequest{}
 	siApp.Tags = map[string]string{
 		siCommon.AppTagNamespaceResourceQuota:      "{\"resources\":{\"validMaxRes\":{\"value\":11}}}",
 		siCommon.AppTagNamespaceResourceGuaranteed: "{\"resources\":{\"validGuaranteed\":{\"value\":22}}}",
+		siCommon.AppTagNamespaceResourceMaxApps:    "33",
 	}
-	app = NewApplication(siApp, user, nil, "")
+
+	app := NewApplication(siApp, user, nil, "")
+
 	guaranteed := app.GetGuaranteedResource()
 	maxResource := app.GetMaxResource()
+	maxApps := app.GetMaxApps()
 	assert.Assert(t, guaranteed != nil, "guaranteed resource has not been set")
 	assert.Equal(t, 1, len(guaranteed.Resources), "more than one resource has been set")
 	assert.Equal(t, resources.Quantity(22), guaranteed.Resources["validGuaranteed"])
 	assert.Assert(t, maxResource != nil, "maximum resource has not been set")
 	assert.Equal(t, 1, len(maxResource.Resources), "more than one resource has been set")
 	assert.Equal(t, resources.Quantity(11), maxResource.Resources["validMaxRes"], "maximum resource is incorrect")
+	assert.Assert(t, maxApps != 0, "maximum apps has not been set or incorrect")
+	assert.Equal(t, uint64(33), maxApps, "maximum apps is incorrect")
+
+	// valid tags without max apps
+	siApp = &si.AddApplicationRequest{}
+	siApp.Tags = map[string]string{
+		siCommon.AppTagNamespaceResourceQuota:      "{\"resources\":{\"validMaxRes\":{\"value\":11}}}",
+		siCommon.AppTagNamespaceResourceGuaranteed: "{\"resources\":{\"validGuaranteed\":{\"value\":22}}}",
+	}
+	app = NewApplication(siApp, user, nil, "")
+	guaranteed = app.GetGuaranteedResource()
+	maxResource = app.GetMaxResource()
+	maxApps = app.GetMaxApps()
+	assert.Assert(t, guaranteed != nil, "guaranteed resource has not been set")
+	assert.Equal(t, 1, len(guaranteed.Resources), "more than one resource has been set")
+	assert.Equal(t, resources.Quantity(22), guaranteed.Resources["validGuaranteed"])
+	assert.Assert(t, maxResource != nil, "maximum resource has not been set")
+	assert.Equal(t, 1, len(maxResource.Resources), "more than one resource has been set")
+	assert.Equal(t, resources.Quantity(11), maxResource.Resources["validMaxRes"], "maximum resource is incorrect")
+	assert.Assert(t, maxApps == 0, "maximum apps should have not been set")
 
 	// invalid tags
 	siApp = &si.AddApplicationRequest{}
 	siApp.Tags = map[string]string{
 		siCommon.AppTagNamespaceResourceQuota:      "{xxxxxx}",
 		siCommon.AppTagNamespaceResourceGuaranteed: "{yyyyy}",
+		siCommon.AppTagNamespaceResourceMaxApps:    "zzzzz",
 	}
 	app = NewApplication(siApp, user, nil, "")
 	guaranteed = app.GetGuaranteedResource()
 	maxResource = app.GetMaxResource()
+	maxApps = app.GetMaxApps()
 	assert.Assert(t, guaranteed == nil, "guaranteed resource should have not been set")
 	assert.Assert(t, maxResource == nil, "maximum resource should have not been set")
+	assert.Assert(t, maxApps == 0, "maximum apps should have not been set or incorrect")
 
 	// negative values
 	siApp = &si.AddApplicationRequest{}
 	siApp.Tags = map[string]string{
 		siCommon.AppTagNamespaceResourceQuota:      "{\"resources\":{\"negativeMax\":{\"value\":-11}}}",
 		siCommon.AppTagNamespaceResourceGuaranteed: "{\"resources\":{\"negativeGuaranteed\":{\"value\":-22}}}",
+		siCommon.AppTagNamespaceResourceMaxApps:    "-33",
 	}
 	app = NewApplication(siApp, user, nil, "")
 	guaranteed = app.GetGuaranteedResource()
 	maxResource = app.GetMaxResource()
+	maxApps = app.GetMaxApps()
 	assert.Assert(t, guaranteed == nil, "guaranteed resource should have not been set")
 	assert.Assert(t, maxResource == nil, "maximum resource should have not been set")
+	assert.Assert(t, maxApps == 0, "maximum apps should have not been set or incorrect")
+
+	// valid max apps
+	siApp = &si.AddApplicationRequest{}
+	siApp.Tags = map[string]string{
+		siCommon.AppTagNamespaceResourceMaxApps: "33",
+	}
+	app = NewApplication(siApp, user, nil, "")
+	maxApps = app.GetMaxApps()
+	guaranteed = app.GetGuaranteedResource()
+	maxResource = app.GetMaxResource()
+	assert.Assert(t, guaranteed == nil, "guaranteed resource should have not been set")
+	assert.Assert(t, maxResource == nil, "maximum resource should have not been set")
+	assert.Assert(t, maxApps != 0, "maximum apps has not been set or incorrect")
+	assert.Equal(t, uint64(33), maxApps, "maximum apps is incorrect")
+
+	// invalid max apps
+	siApp = &si.AddApplicationRequest{}
+	siApp.Tags = map[string]string{
+		siCommon.AppTagNamespaceResourceMaxApps: "zzzzz",
+	}
+	app = NewApplication(siApp, user, nil, "")
+	maxApps = app.GetMaxApps()
+	maxResource = app.GetMaxResource()
+	assert.Assert(t, guaranteed == nil, "guaranteed resource should have not been set")
+	assert.Assert(t, maxResource == nil, "maximum resource should have not been set")
+	assert.Assert(t, maxApps == 0, "maximum apps should have not been set or incorrect")
+
+	// negative max apps
+	siApp = &si.AddApplicationRequest{}
+	siApp.Tags = map[string]string{
+		siCommon.AppTagNamespaceResourceMaxApps: "-33",
+	}
+	app = NewApplication(siApp, user, nil, "")
+	maxApps = app.GetMaxApps()
+	maxResource = app.GetMaxResource()
+	assert.Assert(t, guaranteed == nil, "guaranteed resource should have not been set")
+	assert.Assert(t, maxResource == nil, "maximum resource should have not been set")
+	assert.Assert(t, maxApps == 0, "maximum apps should have not been set or incorrect")
 }
 
 // test basic reservations
@@ -2799,3 +2873,33 @@
 	defer sa.Unlock()
 	sa.appEvents = schedEvt.NewApplicationEvents(events.GetEventSystem())
 }
+
+func TestGetUint64Tag(t *testing.T) {
+	app := &Application{
+		tags: map[string]string{
+			"validUintTag":    "12345",
+			"negativeUintTag": "-12345",
+			"invalidUintTag":  "not-a-number",
+			"emptyUintTag":    "",
+		},
+	}
+
+	tests := []struct {
+		name     string
+		tag      string
+		expected uint64
+	}{
+		{"Valid uint64 tag", "validUintTag", uint64(12345)},
+		{"Negative uint64 tag", "negativeUintTag", uint64(0)},
+		{"Invalid uint64 tag", "invalidUintTag", uint64(0)},
+		{"Empty tag", "emptyUintTag", uint64(0)},
+		{"Non-existent tag", "nonExistentTag", uint64(0)},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			result := app.getUint64Tag(tt.tag)
+			assert.Equal(t, tt.expected, result)
+		})
+	}
+}
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 71b12e8..cb11c90 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -388,7 +388,6 @@
 			zap.Error(err))
 		return err
 	}
-
 	sq.setResources(guaranteedResource, maxResource)
 	return nil
 }
@@ -453,6 +452,16 @@
 	sq.setResources(guaranteedResource, maxResource)
 }
 
+// SetMaxRunningApps allows setting the maximum running apps on a queue
+func (sq *Queue) SetMaxRunningApps(maxApps uint64) {
+	if sq == nil {
+		return
+	}
+	sq.Lock()
+	defer sq.Unlock()
+	sq.maxRunningApps = maxApps
+}
+
 // setTemplate sets the template on the queue based on the config.
 // lock free call, must be called holding the queue lock or during create only
 func (sq *Queue) setTemplate(conf configs.ChildTemplate) error {
@@ -601,7 +610,14 @@
 	return sq.guaranteedResource
 }
 
-// GetActualGuaranteedResource returns the actual (including parent) guaranteed resources for the queue.
+// GetMaxApps returns the maximum number of applications that can run in this queue.
+func (sq *Queue) GetMaxApps() uint64 {
+	sq.RLock()
+	defer sq.RUnlock()
+	return sq.maxRunningApps
+}
+
+// GetActualGuaranteedResources returns the actual (including parent) guaranteed resources for the queue.
 func (sq *Queue) GetActualGuaranteedResource() *resources.Resource {
 	if sq == nil {
 		return resources.NewResource()
@@ -1683,20 +1699,6 @@
 	return sq.preemptionPolicy
 }
 
-// SetMaxRunningApps allows setting the maximum running apps on a queue
-// test only
-func (sq *Queue) SetMaxRunningApps(max int) {
-	if sq == nil {
-		return
-	}
-	if max < 0 {
-		return
-	}
-	sq.Lock()
-	defer sq.Unlock()
-	sq.maxRunningApps = uint64(max)
-}
-
 // FindEligiblePreemptionVictims is used to locate tasks which may be preempted for the given ask.
 // queuePath is the fully-qualified path of the queue where ask resides
 // ask is the ask we are attempting to preempt for
diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go
index cd0700c..c0792d5 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -2611,6 +2611,22 @@
 	}
 }
 
+func TestQueueSetMaxRunningApps(t *testing.T) {
+	defer func() {
+		if r := recover(); r != nil {
+			t.Fatal("panic on nil queue setMaxRunningApps")
+		}
+	}()
+	queue := &Queue{}
+	maxApps := uint64(10)
+
+	queue.SetMaxRunningApps(maxApps)
+	assert.Equal(t, maxApps, queue.maxRunningApps)
+
+	queue = nil
+	queue.SetMaxRunningApps(maxApps)
+}
+
 func TestQueue_allocatedResFits_Other(t *testing.T) {
 	const first = "first"
 	const second = "second"
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index a614ee4..3a26182 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -342,13 +342,19 @@
 
 	guaranteedRes := app.GetGuaranteedResource()
 	maxRes := app.GetMaxResource()
-	if !isRecoveryQueue && (guaranteedRes != nil || maxRes != nil) {
+	maxApps := app.GetMaxApps()
+	if !isRecoveryQueue && (guaranteedRes != nil || maxRes != nil || maxApps != 0) {
 		// set resources based on tags, but only if the queue is dynamic (unmanaged)
 		if queue.IsManaged() {
 			log.Log(log.SchedQueue).Warn("Trying to set resources on a queue that is not an unmanaged leaf",
 				zap.String("queueName", queue.QueuePath))
 		} else {
-			queue.SetResources(guaranteedRes, maxRes)
+			if maxApps != 0 {
+				queue.SetMaxRunningApps(maxApps)
+			}
+			if guaranteedRes != nil || maxRes != nil {
+				queue.SetResources(guaranteedRes, maxRes)
+			}
 		}
 	}
 
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index f138d23..24f51fa 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -2808,6 +2808,7 @@
 	tags := map[string]string{
 		siCommon.AppTagNamespaceResourceGuaranteed: "{\"resources\":{\"vcore\":{\"value\":111}}}",
 		siCommon.AppTagNamespaceResourceQuota:      "{\"resources\":{\"vcore\":{\"value\":2222}}}",
+		siCommon.AppTagNamespaceResourceMaxApps:    "1",
 	}
 	app := newApplicationTGTags(appID1, "default", "root.limited", tgRes, tags)
 	err = partition.AddApplication(app)
@@ -2819,6 +2820,7 @@
 		"vcore": 1000,
 	})), "max resource changed unexpectedly")
 	assert.Assert(t, queue.GetGuaranteedResource() == nil)
+	assert.Equal(t, queue.GetMaxApps(), uint64(2), "max running apps should be 2")
 
 	// add a app with TG that does fit in the queue
 	limit = map[string]string{"vcore": "100"}
@@ -2832,6 +2834,7 @@
 		"vcore": 100000,
 	})), "max resource changed unexpectedly")
 	assert.Assert(t, queue.GetGuaranteedResource() == nil)
+	assert.Equal(t, queue.GetMaxApps(), uint64(2), "max running apps should be 2")
 
 	// add a app with TG that does fit in the queue as the resource is not limited in the queue
 	limit = map[string]string{"second": "100"}
@@ -2845,6 +2848,7 @@
 		"second": 100,
 	})), "max resource changed unexpectedly")
 	assert.Assert(t, queue.GetGuaranteedResource() == nil)
+	assert.Equal(t, queue.GetMaxApps(), uint64(2), "max running apps should be 2")
 }
 
 func TestAddTGAppDynamic(t *testing.T) {
@@ -2861,7 +2865,11 @@
 	assert.Equal(t, app.GetQueuePath(), "root.unlimited", "app-1 not placed in expected queue")
 
 	jsonMaxRes := "{\"resources\":{\"vcore\":{\"value\":10000}}}"
-	tags = map[string]string{"taskqueue": "same", siCommon.AppTagNamespaceResourceQuota: jsonMaxRes}
+	tags = map[string]string{
+		"taskqueue":                             "same",
+		siCommon.AppTagNamespaceResourceQuota:   jsonMaxRes,
+		siCommon.AppTagNamespaceResourceMaxApps: "1",
+	}
 	app = newApplicationTGTags(appID2, "default", "unknown", tgRes, tags)
 	err = partition.AddApplication(app)
 	assert.NilError(t, err, "app-2 should have been added to the partition")
@@ -2874,10 +2882,16 @@
 	assert.Assert(t, resources.Equals(maxRes, resources.NewResourceFromMap(map[string]resources.Quantity{
 		"vcore": 10000,
 	})), "max resource set on the queue does not match the JSON tag")
+	assert.Equal(t, queue.GetMaxApps(), uint64(1), "max running apps should be 1")
 
 	jsonMaxRes = "{\"resources\":{\"vcore\":{\"value\":1000}}}"
 	jsonGuaranteedRes := "{\"resources\":{\"vcore\":{\"value\":111}}}"
-	tags = map[string]string{"taskqueue": "smaller", siCommon.AppTagNamespaceResourceQuota: jsonMaxRes, siCommon.AppTagNamespaceResourceGuaranteed: jsonGuaranteedRes}
+	tags = map[string]string{
+		"taskqueue":                                "smaller",
+		siCommon.AppTagNamespaceResourceQuota:      jsonMaxRes,
+		siCommon.AppTagNamespaceResourceGuaranteed: jsonGuaranteedRes,
+		siCommon.AppTagNamespaceResourceMaxApps:    "1",
+	}
 	app = newApplicationTGTags(appID3, "default", "unknown", tgRes, tags)
 	err = partition.AddApplication(app)
 	if err == nil {
@@ -2900,6 +2914,7 @@
 	assert.Assert(t, resources.Equals(guaranteedRes, resources.NewResourceFromMap(map[string]resources.Quantity{
 		"vcore": 111,
 	})), "guaranteed resource set on the queue does not match the JSON tag")
+	assert.Equal(t, queue.GetMaxApps(), uint64(1), "max running apps should be 1")
 }
 
 func TestPlaceholderSmallerThanReal(t *testing.T) {
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index 96e6d8e..ea1192c 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -392,6 +392,7 @@
 						Resources: configs.Resources{
 							Max: resLimit,
 						},
+						MaxApplications: 2,
 						Limits: []configs.Limit{
 							{
 								Limit: "limited queue limit",