[YUNIKORN-1190] Account for usage of pods without applicationId (#413)

In plugin mode pods could be scheduled by the default scheduler and not
accuonted for in the node usage. This happens for pods without an
application ID but with the scheduler set to YuniKorn

Closes: #413

Signed-off-by: Wilfred Spiegelenburg <wilfreds@apache.org>
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index e07995d..80b3444 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -267,7 +267,11 @@
 func (ctx *Context) filterPods(obj interface{}) bool {
 	switch obj := obj.(type) {
 	case *v1.Pod:
-		return utils.GeneralPodFilter(obj)
+		if utils.GeneralPodFilter(obj) {
+			_, err := utils.GetApplicationIDFromPod(obj)
+			return err == nil
+		}
+		return false
 	default:
 		return false
 	}
diff --git a/pkg/cache/context_recovery.go b/pkg/cache/context_recovery.go
index 79e99d1..73b9947 100644
--- a/pkg/cache/context_recovery.go
+++ b/pkg/cache/context_recovery.go
@@ -104,7 +104,10 @@
 				continue
 			}
 			// yunikorn scheduled pods add to existing allocations
-			if utils.GeneralPodFilter(&pod) {
+			_, err = utils.GetApplicationIDFromPod(&pod)
+			ykPod := utils.GeneralPodFilter(&pod) && err == nil
+			switch {
+			case ykPod:
 				if existingAlloc := getExistingAllocation(mgr, &pod); existingAlloc != nil {
 					log.Logger().Debug("Adding resources for existing pod",
 						zap.String("appID", existingAlloc.ApplicationID),
@@ -123,7 +126,7 @@
 						zap.String("nodeName", pod.Spec.NodeName),
 						zap.Stringer("resources", common.GetPodResource(&pod)))
 				}
-			} else if !utils.IsPodTerminated(&pod) {
+			case !utils.IsPodTerminated(&pod):
 				// pod is not terminated (succeed or failed) state,
 				// and it has a node assigned, that means the scheduler
 				// has already allocated the pod onto a node
@@ -141,7 +144,7 @@
 				occupiedResource = common.Add(occupiedResource, podResource)
 				nodeOccupiedResources[pod.Spec.NodeName] = occupiedResource
 				ctx.nodes.cache.AddPod(&pod)
-			} else {
+			default:
 				log.Logger().Debug("Skipping terminated pod",
 					zap.String("podUID", string(pod.UID)),
 					zap.String("podName", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)))
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index f2a81cd..58b3908 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -263,9 +263,22 @@
 		},
 		Spec: v1.PodSpec{SchedulerName: "default-scheduler"},
 	}
+	pod3 := &v1.Pod{
+		TypeMeta: apis.TypeMeta{
+			Kind:       "Pod",
+			APIVersion: "v1",
+		},
+		ObjectMeta: apis.ObjectMeta{
+			Name:   "yunikorn-test-00003",
+			UID:    "UID-00003",
+			Labels: map[string]string{"applicationId": "test-00003"},
+		},
+		Spec: v1.PodSpec{SchedulerName: "yunikorn"},
+	}
 	assert.Check(t, !context.filterPods(nil), "nil object was allowed")
-	assert.Check(t, context.filterPods(pod1), "yunikorn-managed pod was filtered")
+	assert.Check(t, !context.filterPods(pod1), "yunikorn-managed pod with no app id was allowed")
 	assert.Check(t, !context.filterPods(pod2), "non-yunikorn-managed pod was allowed")
+	assert.Check(t, context.filterPods(pod3), "yunikorn-managed pod was filtered")
 }
 
 func TestAddPodToCache(t *testing.T) {
diff --git a/pkg/cache/node_coordinator.go b/pkg/cache/node_coordinator.go
index 78dea3e..df0e048 100644
--- a/pkg/cache/node_coordinator.go
+++ b/pkg/cache/node_coordinator.go
@@ -48,10 +48,13 @@
 
 // filter pods that not scheduled by us
 func (c *nodeResourceCoordinator) filterPods(obj interface{}) bool {
-	switch obj.(type) {
+	switch obj := obj.(type) {
 	case *v1.Pod:
-		pod := obj.(*v1.Pod)
-		return !utils.GeneralPodFilter(pod)
+		if utils.GeneralPodFilter(obj) {
+			_, err := utils.GetApplicationIDFromPod(obj)
+			return err != nil
+		}
+		return true
 	default:
 		return false
 	}
diff --git a/pkg/cache/node_coordinator_test.go b/pkg/cache/node_coordinator_test.go
index fe9fee5..2f6f3b6 100644
--- a/pkg/cache/node_coordinator_test.go
+++ b/pkg/cache/node_coordinator_test.go
@@ -23,6 +23,7 @@
 
 	"gotest.tools/assert"
 	v1 "k8s.io/api/core/v1"
+	apis "k8s.io/apimachinery/pkg/apis/meta/v1"
 
 	"github.com/apache/yunikorn-k8shim/pkg/common/constants"
 	"github.com/apache/yunikorn-k8shim/pkg/common/utils"
@@ -295,3 +296,50 @@
 	coordinator.deletePod(pod2)
 	assert.Equal(t, executed, false)
 }
+
+func TestNodeCoordinatorFilterPods(t *testing.T) {
+	mockedSchedulerAPI := newMockSchedulerAPI()
+	nodes := newSchedulerNodes(mockedSchedulerAPI, NewTestSchedulerCache())
+	host1 := utils.NodeForTest(Host1, "10G", "10")
+	nodes.addNode(host1)
+	coordinator := newNodeResourceCoordinator(nodes)
+
+	pod1 := &v1.Pod{
+		TypeMeta: apis.TypeMeta{
+			Kind:       "Pod",
+			APIVersion: "v1",
+		},
+		ObjectMeta: apis.ObjectMeta{
+			Name: "yunikorn-test-00001",
+			UID:  "UID-00001",
+		},
+		Spec: v1.PodSpec{SchedulerName: "yunikorn"},
+	}
+	pod2 := &v1.Pod{
+		TypeMeta: apis.TypeMeta{
+			Kind:       "Pod",
+			APIVersion: "v1",
+		},
+		ObjectMeta: apis.ObjectMeta{
+			Name: "yunikorn-test-00002",
+			UID:  "UID-00002",
+		},
+		Spec: v1.PodSpec{SchedulerName: "default-scheduler"},
+	}
+	pod3 := &v1.Pod{
+		TypeMeta: apis.TypeMeta{
+			Kind:       "Pod",
+			APIVersion: "v1",
+		},
+		ObjectMeta: apis.ObjectMeta{
+			Name:   "yunikorn-test-00003",
+			UID:    "UID-00003",
+			Labels: map[string]string{"applicationId": "test-00003"},
+		},
+		Spec: v1.PodSpec{SchedulerName: "yunikorn"},
+	}
+	assert.Check(t, !coordinator.filterPods(nil), "nil object was allowed")
+	assert.Check(t, coordinator.filterPods(pod1), "yunikorn-managed pod with no app id was filtered")
+	assert.Check(t, coordinator.filterPods(pod2), "non-yunikorn-managed pod was filtered")
+	assert.Check(t, !coordinator.filterPods(pod3), "yunikorn-managed pod was allowed")
+}