jobStatuses use appId as key (#1686)

diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultK8sFlinkTrackMonitor.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultK8sFlinkTrackMonitor.scala
index 86faca1..af265f7 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultK8sFlinkTrackMonitor.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultK8sFlinkTrackMonitor.scala
@@ -77,9 +77,9 @@
 
   override def getJobStatus(trackId: TrackId): Option[JobStatusCV] = Option(trackController.jobStatuses.get(trackId))
 
-  override def getJobStatus(trackIds: Set[TrackId]): Map[TrackId, JobStatusCV] = trackController.jobStatuses.getAsMap(trackIds)
+  override def getJobStatus(trackIds: Set[TrackId]): Map[CacheKey, JobStatusCV] = trackController.jobStatuses.getAsMap(trackIds)
 
-  override def getAllJobStatus: Map[TrackId, JobStatusCV] = trackController.jobStatuses.asMap()
+  override def getAllJobStatus: Map[CacheKey, JobStatusCV] = trackController.jobStatuses.asMap()
 
   override def getAccClusterMetrics: FlinkMetricCV = trackController.collectAccMetric()
 
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkTrackController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkTrackController.scala
index ca89aaa..1708858 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkTrackController.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkTrackController.scala
@@ -89,7 +89,9 @@
     collectTracks() match {
       case k if k.isEmpty => FlinkMetricCV.empty
       case k =>
-        flinkMetrics.getAll(for (elem <- k) yield {ClusterKey.of(elem)}) match {
+        flinkMetrics.getAll(for (elem <- k) yield {
+          ClusterKey.of(elem)
+        }) match {
           case m if m.isEmpty => FlinkMetricCV.empty
           case m =>
             // aggregate metrics
@@ -119,10 +121,9 @@
 }
 
 //----cache----
+case class CacheKey(key: java.lang.Long) extends Serializable
 
 class TrackIdCache {
-  case class CacheKey(key: java.lang.Long) extends Serializable
-
   private[this] lazy val cache: Cache[CacheKey, TrackId] = Caffeine.newBuilder.build()
 
   def update(k: TrackId): Unit = {
@@ -153,19 +154,19 @@
 
 class JobStatusCache {
 
-  private[this] lazy val cache: Cache[TrackId, JobStatusCV] = Caffeine.newBuilder.build()
+  private[this] lazy val cache: Cache[CacheKey, JobStatusCV] = Caffeine.newBuilder.build()
 
-  def putAll(kvs: Map[TrackId, JobStatusCV]): Unit = cache.putAll(kvs)
+  def putAll(kvs: Map[TrackId, JobStatusCV]): Unit = cache.putAll(kvs.map(t => (CacheKey(t._1.appId), t._2)))
 
-  def put(k: TrackId, v: JobStatusCV): Unit = cache.put(k, v)
+  def put(k: TrackId, v: JobStatusCV): Unit = cache.put(CacheKey(k.appId), v)
 
-  def asMap(): Map[TrackId, JobStatusCV] = cache.asMap().toMap
+  def asMap(): Map[CacheKey, JobStatusCV] = cache.asMap().toMap
 
-  def getAsMap(trackIds: Set[TrackId]): Map[TrackId, JobStatusCV] = cache.getAllPresent(trackIds).toMap
+  def getAsMap(trackIds: Set[TrackId]): Map[CacheKey, JobStatusCV] = cache.getAllPresent(trackIds.map(t => t.appId)).toMap
 
-  def get(trackId: TrackId): JobStatusCV = cache.getIfPresent(trackId)
+  def get(k: TrackId): JobStatusCV = cache.getIfPresent(CacheKey(k.appId))
 
-  def invalidate(trackId: TrackId): Unit = cache.invalidate(trackId)
+  def invalidate(k: TrackId): Unit = cache.invalidate(CacheKey(k.appId))
 
   def cleanUp(): Unit = cache.cleanUp()
 
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitor.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitor.scala
index 253419a..e358f32 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitor.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitor.scala
@@ -94,12 +94,12 @@
   /**
    * get flink status
    */
-  def getJobStatus(trackIds: Set[TrackId]): Map[TrackId, JobStatusCV]
+  def getJobStatus(trackIds: Set[TrackId]): Map[CacheKey, JobStatusCV]
 
   /**
    * get all flink status in tracking result pool
    */
-  def getAllJobStatus: Map[TrackId, JobStatusCV]
+  def getAllJobStatus: Map[CacheKey, JobStatusCV]
 
   /**
    * get flink cluster metrics aggregation
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitorLazyStartAop.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitorLazyStartAop.scala
index 0eae8b4..de48e14 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitorLazyStartAop.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitorLazyStartAop.scala
@@ -53,12 +53,12 @@
     super.getJobStatus(trackId)
   }
 
-  abstract override def getJobStatus(trackIds: Set[TrackId]): Map[TrackId, JobStatusCV] = {
+  abstract override def getJobStatus(trackIds: Set[TrackId]): Map[CacheKey, JobStatusCV] = {
     start()
     super.getJobStatus(trackIds)
   }
 
-  abstract override def getAllJobStatus: Map[TrackId, JobStatusCV] = {
+  abstract override def getAllJobStatus: Map[CacheKey, JobStatusCV] = {
     start()
     super.getAllJobStatus
   }