[Improve][Flink-Kubnernetes-V2] Improve UsingObserver test (#3654)

diff --git a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
index e93d190..40080e6 100644
--- a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
+++ b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingObserver.scala
@@ -29,11 +29,11 @@
 
 class UsingObserver extends AnyWordSpecLike with BeforeAndAfterAll {
 
-  "Track and get flink job snapshot." in unsafeRun {
+  "Track and get flink application job snapshot." in unsafeRun {
     for {
       // track resource
       _       <- ZIO.unit
-      trackId  = TrackKey.appJob(233, "fdev", "simple-appjob")
+      trackId  = TrackKey.appJob(114514, "fdev", "simple-appjob")
       _       <- FlinkK8sObserver.track(trackId)
       // get job snapshot
       _       <- ZIO.sleep(3.seconds)
@@ -42,6 +42,35 @@
     } yield ()
   }
 
+  "Track and get flink application endpoint snapshot." in unsafeRun {
+    for {
+      // track resource
+      _       <- ZIO.unit
+      trackId  = TrackKey.appJob(233, "fdev", "simple-appjob")
+      _       <- FlinkK8sObserver.track(trackId)
+      // get job rest endpoint
+      _       <- ZIO.sleep(3.seconds)
+      jobSnap <- FlinkK8sObserver.restSvcEndpointSnaps.get(trackId.namespace, trackId.name)
+      _       <- Console.printLine(jobSnap.prettyStr)
+    } yield ()
+  }
+
+  "Track and get flink seesion job snapshot and endpoint." in unsafeRun {
+    for {
+      // track resource
+      _        <- ZIO.unit
+      trackId   = TrackKey.sessionJob(233, "fdev", "simple-sessionjob", "simple-session")
+      _        <- FlinkK8sObserver.track(trackId)
+      // get job rest endpoint
+      _        <- ZIO.sleep(3.seconds)
+      endpoint <- FlinkK8sObserver.restSvcEndpointSnaps.get(trackId.namespace, trackId.name)
+      _        <- Console.printLine(endpoint.prettyStr)
+      // get job snapshot
+      jobSnap  <- FlinkK8sObserver.evaluatedJobSnaps.getValue(trackId.id)
+      _        <- Console.printLine(jobSnap.prettyStr)
+    } yield ()
+  }
+
   "Track and get flink cluster metrics" in unsafeRun {
     for {
       // track resource
@@ -66,9 +95,11 @@
     } yield ()
   }
 
-  "Only subscribe Flink job state changes." in unsafeRun {
+  "Only subscribe Flink application job state changes." in unsafeRun {
     for {
+      // track resource
       _ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev", "simple-appjob"))
+      // subscribe job status changes
       _ <- FlinkK8sObserver.evaluatedJobSnaps
              .flatSubscribe()
              .map { case (appId, status) => (appId, status.evalState) }
@@ -78,9 +109,11 @@
     } yield ()
   }
 
-  "Only subscribe Flink job enpoint changes." in unsafeRun {
+  "Only subscribe Flink application job enpoint changes." in unsafeRun {
     for {
+      // track resource
       _ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev", "simple-appjob"))
+      // subscribe job status changes
       _ <- FlinkK8sObserver.restSvcEndpointSnaps
              .flatSubscribe()
              .map { case (appId, status) => (appId, status.ipRest) }