[KYUUBI #4767] Correct the submit time for BatchJobSubmission and check applicationInfo if submitted application

### _Why are the changes needed?_

- if the kyuubi instance is unreachable, we should not transfer the  batch metadata create time as batch submit time
  - we should always wait the kyuubi instance recovery
  - here using a fake submit time to prevent that the batch be marked  as terminated if application state is NOT_FOUND
- Inside the BatchJobSubmission, using the first get application info time as batch submit time.

In this pr, I also record whether the batch operation submit batch.

If it did submit batch application and the app is not started, we need to mark the batch state as ERROR.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4767 from turboFei/submit_time.

Closes #4767

9d4df0f91 [fwang12] save
3e56a39cb [fwang12] runtime exception -> kyuubi exception
5cac15ec5 [fwang12] nit
92d5000be [fwang12] nit
3678f8f2c [fwang12] wait the app to monitor
d51fb2636 [fwang12] save
708ad20ce [fwang12] refactor
98d49c64e [fwang12] wait
1adbefd59 [fwang12] revert
f3e4f2a11 [fwang12] wait app id ready before monitoring
a3bfe6f56 [fwang12] check app started
7530b5118 [fwang12] check submit app and final state
a41e81d0e [fwang12] refactor
e4217da03 [fwang12] _app start time
3d1e8f022 [fwang12] fake submit timeout
06c8f0a22 [fwang12] correct submit time

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
(cherry picked from commit 94c72734ca2d64328f99715ed648305d73a0718b)
Signed-off-by: fwang12 <fwang12@ebay.com>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index e6433cd..702a9a9 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -78,6 +78,9 @@
 
   @volatile private var _appStartTime = recoveryMetadata.map(_.engineOpenTime).getOrElse(0L)
   def appStartTime: Long = _appStartTime
+  def appStarted: Boolean = _appStartTime > 0
+
+  private lazy val _submitTime = if (appStarted) _appStartTime else System.currentTimeMillis
 
   @VisibleForTesting
   private[kyuubi] val builder: ProcBuilder = {
@@ -101,16 +104,11 @@
 
   override protected def currentApplicationInfo(): Option[ApplicationInfo] = {
     if (isTerminal(state) && _applicationInfo.nonEmpty) return _applicationInfo
-    val submitTime = if (_appStartTime <= 0) {
-      System.currentTimeMillis()
-    } else {
-      _appStartTime
-    }
     val applicationInfo =
       applicationManager.getApplicationInfo(
         builder.clusterManager(),
         batchId,
-        Some(submitTime))
+        Some(_submitTime))
     applicationId(applicationInfo).foreach { _ =>
       if (_appStartTime <= 0) {
         _appStartTime = System.currentTimeMillis()
@@ -142,21 +140,20 @@
 
     if (isTerminalState(state)) {
       if (_applicationInfo.isEmpty) {
-        _applicationInfo =
-          Option(ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND))
+        _applicationInfo = Some(ApplicationInfo.NOT_FOUND)
       }
     }
 
-    _applicationInfo.foreach { status =>
+    _applicationInfo.foreach { appInfo =>
       val metadataToUpdate = Metadata(
         identifier = batchId,
         state = state.toString,
         engineOpenTime = appStartTime,
-        engineId = status.id,
-        engineName = status.name,
-        engineUrl = status.url.orNull,
-        engineState = status.state.toString,
-        engineError = status.error,
+        engineId = appInfo.id,
+        engineName = appInfo.name,
+        engineUrl = appInfo.url.orNull,
+        engineState = appInfo.state.toString,
+        engineError = appInfo.error,
         endTime = endTime)
       session.sessionManager.updateMetadata(metadataToUpdate)
     }
@@ -258,14 +255,25 @@
 
       if (applicationFailed(_applicationInfo)) {
         process.destroyForcibly()
-        throw new RuntimeException(s"Batch job failed: ${_applicationInfo}")
+        throw new KyuubiException(s"Batch job failed: ${_applicationInfo}")
       } else {
         process.waitFor()
         if (process.exitValue() != 0) {
           throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
         }
 
-        applicationId(_applicationInfo).foreach(monitorBatchJob)
+        while (!appStarted && applicationId(_applicationInfo).isEmpty &&
+          !applicationTerminated(_applicationInfo)) {
+          Thread.sleep(applicationCheckInterval)
+          updateApplicationInfoMetadataIfNeeded()
+        }
+
+        applicationId(_applicationInfo) match {
+          case Some(appId) => monitorBatchJob(appId)
+          case None if !appStarted =>
+            throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
+          case None =>
+        }
       }
     } finally {
       builder.close()
@@ -284,7 +292,7 @@
     if (_applicationInfo.isEmpty) {
       info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.")
     } else if (applicationFailed(_applicationInfo)) {
-      throw new RuntimeException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
+      throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
     } else {
       updateBatchMetadata()
       // TODO: add limit for max batch job submission lifetime
@@ -294,7 +302,7 @@
       }
 
       if (applicationFailed(_applicationInfo)) {
-        throw new RuntimeException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
+        throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
       }
     }
   }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index a7af369..38ce0e2 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -297,7 +297,8 @@
               val batchAppStatus = sessionManager.applicationManager.getApplicationInfo(
                 metadata.clusterManager,
                 batchId,
-                Some(metadata.createTime))
+                // prevent that the batch be marked as terminated if application state is NOT_FOUND
+                Some(metadata.engineOpenTime).filter(_ > 0).orElse(Some(System.currentTimeMillis)))
               buildBatch(metadata, batchAppStatus)
           }
         }