[Improve][Flink-Kubnernetes-V2] Improve UsingObserver test (#3654)
diff --git a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
index e93d190..40080e6 100644
--- a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
+++ b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
@@ -29,11 +29,11 @@
class UsingObserver extends AnyWordSpecLike with BeforeAndAfterAll {
- "Track and get flink job snapshot." in unsafeRun {
+ "Track and get flink application job snapshot." in unsafeRun {
for {
// track resource
_ <- ZIO.unit
- trackId = TrackKey.appJob(233, "fdev", "simple-appjob")
+ trackId = TrackKey.appJob(114514, "fdev", "simple-appjob")
_ <- FlinkK8sObserver.track(trackId)
// get job snapshot
_ <- ZIO.sleep(3.seconds)
@@ -42,6 +42,35 @@
} yield ()
}
+ "Track and get flink application endpoint snapshot." in unsafeRun {
+ for {
+ // track resource
+ _ <- ZIO.unit
+ trackId = TrackKey.appJob(233, "fdev", "simple-appjob")
+ _ <- FlinkK8sObserver.track(trackId)
+ // get job rest endpoint
+ _ <- ZIO.sleep(3.seconds)
+ jobSnap <- FlinkK8sObserver.restSvcEndpointSnaps.get(trackId.namespace, trackId.name)
+ _ <- Console.printLine(jobSnap.prettyStr)
+ } yield ()
+ }
+
+ "Track and get flink seesion job snapshot and endpoint." in unsafeRun {
+ for {
+ // track resource
+ _ <- ZIO.unit
+ trackId = TrackKey.sessionJob(233, "fdev", "simple-sessionjob", "simple-session")
+ _ <- FlinkK8sObserver.track(trackId)
+ // get job rest endpoint
+ _ <- ZIO.sleep(3.seconds)
+ endpoint <- FlinkK8sObserver.restSvcEndpointSnaps.get(trackId.namespace, trackId.name)
+ _ <- Console.printLine(endpoint.prettyStr)
+ // get job snapshot
+ jobSnap <- FlinkK8sObserver.evaluatedJobSnaps.getValue(trackId.id)
+ _ <- Console.printLine(jobSnap.prettyStr)
+ } yield ()
+ }
+
"Track and get flink cluster metrics" in unsafeRun {
for {
// track resource
@@ -66,9 +95,11 @@
} yield ()
}
- "Only subscribe Flink job state changes." in unsafeRun {
+ "Only subscribe Flink application job state changes." in unsafeRun {
for {
+ // track resource
_ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev", "simple-appjob"))
+ // subscribe job status changes
_ <- FlinkK8sObserver.evaluatedJobSnaps
.flatSubscribe()
.map { case (appId, status) => (appId, status.evalState) }
@@ -78,9 +109,11 @@
} yield ()
}
- "Only subscribe Flink job enpoint changes." in unsafeRun {
+ "Only subscribe Flink application job enpoint changes." in unsafeRun {
for {
+ // track resource
_ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev", "simple-appjob"))
+ // subscribe job status changes
_ <- FlinkK8sObserver.restSvcEndpointSnaps
.flatSubscribe()
.map { case (appId, status) => (appId, status.ipRest) }