jobStatuses use appId as key (#1686)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultK8sFlinkTrackMonitor.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultK8sFlinkTrackMonitor.scala
index 86faca1..af265f7 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultK8sFlinkTrackMonitor.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/DefaultK8sFlinkTrackMonitor.scala
@@ -77,9 +77,9 @@
override def getJobStatus(trackId: TrackId): Option[JobStatusCV] = Option(trackController.jobStatuses.get(trackId))
- override def getJobStatus(trackIds: Set[TrackId]): Map[TrackId, JobStatusCV] = trackController.jobStatuses.getAsMap(trackIds)
+ override def getJobStatus(trackIds: Set[TrackId]): Map[CacheKey, JobStatusCV] = trackController.jobStatuses.getAsMap(trackIds)
- override def getAllJobStatus: Map[TrackId, JobStatusCV] = trackController.jobStatuses.asMap()
+ override def getAllJobStatus: Map[CacheKey, JobStatusCV] = trackController.jobStatuses.asMap()
override def getAccClusterMetrics: FlinkMetricCV = trackController.collectAccMetric()
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkTrackController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkTrackController.scala
index ca89aaa..1708858 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkTrackController.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkTrackController.scala
@@ -89,7 +89,9 @@
collectTracks() match {
case k if k.isEmpty => FlinkMetricCV.empty
case k =>
- flinkMetrics.getAll(for (elem <- k) yield {ClusterKey.of(elem)}) match {
+ flinkMetrics.getAll(for (elem <- k) yield {
+ ClusterKey.of(elem)
+ }) match {
case m if m.isEmpty => FlinkMetricCV.empty
case m =>
// aggregate metrics
@@ -119,10 +121,9 @@
}
//----cache----
+case class CacheKey(key: java.lang.Long) extends Serializable
class TrackIdCache {
- case class CacheKey(key: java.lang.Long) extends Serializable
-
private[this] lazy val cache: Cache[CacheKey, TrackId] = Caffeine.newBuilder.build()
def update(k: TrackId): Unit = {
@@ -153,19 +154,19 @@
class JobStatusCache {
- private[this] lazy val cache: Cache[TrackId, JobStatusCV] = Caffeine.newBuilder.build()
+ private[this] lazy val cache: Cache[CacheKey, JobStatusCV] = Caffeine.newBuilder.build()
- def putAll(kvs: Map[TrackId, JobStatusCV]): Unit = cache.putAll(kvs)
+ def putAll(kvs: Map[TrackId, JobStatusCV]): Unit = cache.putAll(kvs.map(t => (CacheKey(t._1.appId), t._2)))
- def put(k: TrackId, v: JobStatusCV): Unit = cache.put(k, v)
+ def put(k: TrackId, v: JobStatusCV): Unit = cache.put(CacheKey(k.appId), v)
- def asMap(): Map[TrackId, JobStatusCV] = cache.asMap().toMap
+ def asMap(): Map[CacheKey, JobStatusCV] = cache.asMap().toMap
- def getAsMap(trackIds: Set[TrackId]): Map[TrackId, JobStatusCV] = cache.getAllPresent(trackIds).toMap
+ def getAsMap(trackIds: Set[TrackId]): Map[CacheKey, JobStatusCV] = cache.getAllPresent(trackIds.map(t => t.appId)).toMap
- def get(trackId: TrackId): JobStatusCV = cache.getIfPresent(trackId)
+ def get(k: TrackId): JobStatusCV = cache.getIfPresent(CacheKey(k.appId))
- def invalidate(trackId: TrackId): Unit = cache.invalidate(trackId)
+ def invalidate(k: TrackId): Unit = cache.invalidate(CacheKey(k.appId))
def cleanUp(): Unit = cache.cleanUp()
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitor.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitor.scala
index 253419a..e358f32 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitor.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitor.scala
@@ -94,12 +94,12 @@
/**
* get flink status
*/
- def getJobStatus(trackIds: Set[TrackId]): Map[TrackId, JobStatusCV]
+ def getJobStatus(trackIds: Set[TrackId]): Map[CacheKey, JobStatusCV]
/**
* get all flink status in tracking result pool
*/
- def getAllJobStatus: Map[TrackId, JobStatusCV]
+ def getAllJobStatus: Map[CacheKey, JobStatusCV]
/**
* get flink cluster metrics aggregation
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitorLazyStartAop.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitorLazyStartAop.scala
index 0eae8b4..de48e14 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitorLazyStartAop.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/K8sFlinkTrackMonitorLazyStartAop.scala
@@ -53,12 +53,12 @@
super.getJobStatus(trackId)
}
- abstract override def getJobStatus(trackIds: Set[TrackId]): Map[TrackId, JobStatusCV] = {
+ abstract override def getJobStatus(trackIds: Set[TrackId]): Map[CacheKey, JobStatusCV] = {
start()
super.getJobStatus(trackIds)
}
- abstract override def getAllJobStatus: Map[TrackId, JobStatusCV] = {
+ abstract override def getAllJobStatus: Map[CacheKey, JobStatusCV] = {
start()
super.getAllJobStatus
}