[Improve] flink job on yarn exists check improvement (#3424)

Co-authored-by: benjobs <benjobx@gmail.com>
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 6059143..412eabe 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -171,6 +171,14 @@
     return RestResponse.success();
   }
 
+  @PermissionAction(id = "#app.id", type = PermissionTypeEnum.APP)
+  @PostMapping(value = "check_start")
+  @RequiresPermissions("app:start")
+  public RestResponse checkStart(Application app) {
+    AppExistsStateEnum stateEnum = applicationInfoService.checkStart(app);
+    return RestResponse.success(stateEnum.get());
+  }
+
   @Operation(
       summary = "Start application",
       tags = {ApiDocConstant.FLINK_APP_OP_TAG})
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
index c86ce3c..8d1e0b1 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/ApplicationInfoService.java
@@ -21,6 +21,8 @@
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.enums.AppExistsStateEnum;
 
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+
 import com.baomidou.mybatisplus.extension.service.IService;
 
 import java.io.IOException;
@@ -220,4 +222,18 @@
    * @return A list of strings representing the names of the uploaded jars.
    */
   List<String> listHistoryUploadJars();
+
+  /**
+   * check application before start
+   *
+   * @param appParam
+   * @return org.apache.streampark.console.core.enums.AppExistsStateEnum
+   */
+  AppExistsStateEnum checkStart(Application appParam);
+
+  /**
+   * @param appName
+   * @return running,submitted, accepted job list in YARN
+   */
+  List<ApplicationReport> getYarnAppReport(String appName);
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 8f22ca5..85188dc 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -19,7 +19,6 @@
 
 import org.apache.streampark.common.Constant;
 import org.apache.streampark.common.conf.ConfigKeys;
-import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.FlinkDevelopmentMode;
@@ -27,7 +26,6 @@
 import org.apache.streampark.common.enums.FlinkRestoreMode;
 import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.fs.FsOperator;
-import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
 import org.apache.streampark.common.util.HadoopUtils;
@@ -65,6 +63,7 @@
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.VariableService;
 import org.apache.streampark.console.core.service.application.ApplicationActionService;
+import org.apache.streampark.console.core.service.application.ApplicationInfoService;
 import org.apache.streampark.console.core.service.application.ApplicationManageService;
 import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
 import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
@@ -144,6 +143,8 @@
 
   @Autowired private ApplicationManageService applicationManageService;
 
+  @Autowired private ApplicationInfoService applicationInfoService;
+
   @Autowired private ApplicationConfigService configService;
 
   @Autowired private ApplicationLogService applicationLogService;
@@ -229,7 +230,7 @@
       cancelFuture.cancel(true);
     }
     if (startFuture == null && cancelFuture == null) {
-      this.updateToStopped(appParam);
+      this.doStopped(appParam);
     }
   }
 
@@ -325,60 +326,60 @@
 
     cancelFutureMap.put(application.getId(), cancelFuture);
 
-    CompletableFutureUtils.runTimeout(
-            cancelFuture,
-            10L,
-            TimeUnit.MINUTES,
-            cancelResponse -> {
-              applicationLog.setSuccess(true);
-              if (cancelResponse != null && cancelResponse.savePointDir() != null) {
-                String savePointDir = cancelResponse.savePointDir();
-                log.info("savePoint path: {}", savePointDir);
-                SavePoint savePoint = new SavePoint();
-                savePoint.setPath(savePointDir);
-                savePoint.setAppId(application.getId());
-                savePoint.setLatest(true);
-                savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get());
-                savePoint.setCreateTime(new Date());
-                savePoint.setTriggerTime(triggerTime);
-                savePointService.save(savePoint);
+    cancelFuture.whenComplete(
+        (cancelResponse, throwable) -> {
+          cancelFutureMap.remove(application.getId());
+
+          if (throwable != null) {
+            String exception = ExceptionUtils.stringifyException(throwable);
+            applicationLog.setException(exception);
+            applicationLog.setSuccess(false);
+            applicationLogService.save(applicationLog);
+
+            if (throwable instanceof CancellationException) {
+              doStopped(application);
+            } else {
+              log.error("stop flink job failed.", throwable);
+              application.setOptionState(OptionStateEnum.NONE.getValue());
+              application.setState(FlinkAppStateEnum.FAILED.getValue());
+              updateById(application);
+
+              if (appParam.getSavePointed()) {
+                savePointService.expire(application.getId());
               }
+              // re-tracking flink job on kubernetes and logging exception
               if (isKubernetesApp(application)) {
-                k8SFlinkTrackMonitor.unWatching(toTrackId(application));
-              }
-            },
-            e -> {
-              if (e.getCause() instanceof CancellationException) {
-                updateToStopped(application);
+                TrackId id = toTrackId(application);
+                k8SFlinkTrackMonitor.unWatching(id);
+                k8SFlinkTrackMonitor.doWatching(id);
               } else {
-                log.error("stop flink job fail.", e);
-                application.setOptionState(OptionStateEnum.NONE.getValue());
-                application.setState(FlinkAppStateEnum.FAILED.getValue());
-                updateById(application);
-
-                if (appParam.getSavePointed()) {
-                  savePointService.expire(application.getId());
-                }
-
-                // re-tracking flink job on kubernetes and logging exception
-                if (isKubernetesApp(application)) {
-                  TrackId id = toTrackId(application);
-                  k8SFlinkTrackMonitor.unWatching(id);
-                  k8SFlinkTrackMonitor.doWatching(id);
-                } else {
-                  FlinkAppHttpWatcher.unWatching(application.getId());
-                }
-
-                String exception = ExceptionUtils.stringifyException(e);
-                applicationLog.setException(exception);
-                applicationLog.setSuccess(false);
+                FlinkAppHttpWatcher.unWatching(application.getId());
               }
-            })
-        .whenComplete(
-            (t, e) -> {
-              cancelFutureMap.remove(application.getId());
-              applicationLogService.save(applicationLog);
-            });
+            }
+            return;
+          }
+
+          applicationLog.setSuccess(true);
+          // save log...
+          applicationLogService.save(applicationLog);
+
+          if (cancelResponse != null && cancelResponse.savePointDir() != null) {
+            String savePointDir = cancelResponse.savePointDir();
+            log.info("savePoint path: {}", savePointDir);
+            SavePoint savePoint = new SavePoint();
+            savePoint.setPath(savePointDir);
+            savePoint.setAppId(application.getId());
+            savePoint.setLatest(true);
+            savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get());
+            savePoint.setCreateTime(new Date());
+            savePoint.setTriggerTime(triggerTime);
+            savePointService.save(savePoint);
+          }
+
+          if (isKubernetesApp(application)) {
+            k8SFlinkTrackMonitor.unWatching(toTrackId(application));
+          }
+        });
   }
 
   @Override
@@ -392,7 +393,7 @@
     if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) {
 
       ApiAlertException.throwIfTrue(
-          checkAppRepeatInYarn(application.getJobName()),
+          !applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(),
           "[StreamPark] The same task name is already running in the yarn queue");
     }
 
@@ -486,98 +487,87 @@
 
     startFutureMap.put(application.getId(), future);
 
-    CompletableFutureUtils.runTimeout(
-            future,
-            2L,
-            TimeUnit.MINUTES,
-            submitResponse -> {
-              if (submitResponse.flinkConfig() != null) {
-                String jmMemory =
-                    submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
-                if (jmMemory != null) {
-                  application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
-                }
-                String tmMemory =
-                    submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
-                if (tmMemory != null) {
-                  application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
-                }
-              }
-              application.setAppId(submitResponse.clusterId());
-              if (StringUtils.isNoneEmpty(submitResponse.jobId())) {
-                application.setJobId(submitResponse.jobId());
-              }
+    future.whenComplete(
+        (response, throwable) -> {
+          // 1) remove Future
+          startFutureMap.remove(application.getId());
 
-              if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) {
-                application.setJobManagerUrl(submitResponse.jobManagerUrl());
-                applicationLog.setJobManagerUrl(submitResponse.jobManagerUrl());
-              }
-              applicationLog.setYarnAppId(submitResponse.clusterId());
-              application.setStartTime(new Date());
-              application.setEndTime(null);
-              if (isKubernetesApp(application)) {
-                application.setRelease(ReleaseStateEnum.DONE.get());
-              }
-              updateById(application);
-
-              // if start completed, will be added task to tracking queue
-              if (isKubernetesApp(application)) {
-                k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+          // 2) exception
+          if (throwable != null) {
+            String exception = ExceptionUtils.stringifyException(throwable);
+            applicationLog.setException(exception);
+            applicationLog.setSuccess(false);
+            applicationLogService.save(applicationLog);
+            if (throwable instanceof CancellationException) {
+              doStopped(application);
+            } else {
+              Application app = getById(appParam.getId());
+              app.setState(FlinkAppStateEnum.FAILED.getValue());
+              app.setOptionState(OptionStateEnum.NONE.getValue());
+              updateById(app);
+              if (isKubernetesApp(app)) {
+                k8SFlinkTrackMonitor.unWatching(toTrackId(app));
               } else {
-                FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING);
-                FlinkAppHttpWatcher.doWatching(application);
+                FlinkAppHttpWatcher.unWatching(appParam.getId());
               }
+            }
+            return;
+          }
 
-              applicationLog.setSuccess(true);
-              // set savepoint to expire
-              savePointService.expire(application.getId());
-            },
-            e -> {
-              if (e.getCause() instanceof CancellationException) {
-                updateToStopped(application);
-              } else {
-                String exception = ExceptionUtils.stringifyException(e);
-                applicationLog.setException(exception);
-                applicationLog.setSuccess(false);
-                Application app = getById(appParam.getId());
-                app.setState(FlinkAppStateEnum.FAILED.getValue());
-                app.setOptionState(OptionStateEnum.NONE.getValue());
-                updateById(app);
-                if (isKubernetesApp(app)) {
-                  k8SFlinkTrackMonitor.unWatching(toTrackId(app));
-                } else {
-                  FlinkAppHttpWatcher.unWatching(appParam.getId());
+          // 3) success
+          applicationLog.setSuccess(true);
+          if (response.flinkConfig() != null) {
+            String jmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
+            if (jmMemory != null) {
+              application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
+            }
+            String tmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
+            if (tmMemory != null) {
+              application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
+            }
+          }
+          application.setAppId(response.clusterId());
+          if (StringUtils.isNoneEmpty(response.jobId())) {
+            application.setJobId(response.jobId());
+          }
+
+          if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
+            application.setJobManagerUrl(response.jobManagerUrl());
+            applicationLog.setJobManagerUrl(response.jobManagerUrl());
+          }
+          applicationLog.setYarnAppId(response.clusterId());
+          application.setStartTime(new Date());
+          application.setEndTime(null);
+
+          // if start completed, will be added task to tracking queue
+          if (isKubernetesApp(application)) {
+            application.setRelease(ReleaseStateEnum.DONE.get());
+            k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+            if (FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
+              String domainName = settingService.getIngressModeDefault();
+              if (StringUtils.isNotBlank(domainName)) {
+                try {
+                  IngressController.configureIngress(
+                      domainName, application.getClusterId(), application.getK8sNamespace());
+                } catch (KubernetesClientException e) {
+                  log.info("Failed to create ingress, stack info:{}", e.getMessage());
+                  applicationLog.setException(e.getMessage());
+                  applicationLog.setSuccess(false);
+                  applicationLogService.save(applicationLog);
+                  application.setState(FlinkAppStateEnum.FAILED.getValue());
+                  application.setOptionState(OptionStateEnum.NONE.getValue());
                 }
               }
-            })
-        .whenComplete(
-            (t, e) -> {
-              if (!K8sFlinkConfig.isV2Enabled()
-                  && FlinkExecutionMode.isKubernetesApplicationMode(
-                      application.getExecutionMode())) {
-                String domainName = settingService.getIngressModeDefault();
-                if (StringUtils.isNotBlank(domainName)) {
-                  try {
-                    IngressController.configureIngress(
-                        domainName, application.getClusterId(), application.getK8sNamespace());
-                  } catch (KubernetesClientException kubernetesClientException) {
-                    log.info(
-                        "Failed to create ingress, stack info:{}",
-                        kubernetesClientException.getMessage());
-                    applicationLog.setException(e.getMessage());
-                    applicationLog.setSuccess(false);
-                    applicationLogService.save(applicationLog);
-                    application.setState(FlinkAppStateEnum.FAILED.getValue());
-                    application.setOptionState(OptionStateEnum.NONE.getValue());
-                    updateById(application);
-                    return;
-                  }
-                }
-              }
-
-              applicationLogService.save(applicationLog);
-              startFutureMap.remove(application.getId());
-            });
+            }
+          } else {
+            FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING);
+            FlinkAppHttpWatcher.doWatching(application);
+          }
+          // update app
+          updateById(application);
+          // save log
+          applicationLogService.save(applicationLog);
+        });
   }
 
   /**
@@ -781,8 +771,8 @@
     return properties;
   }
 
-  private void updateToStopped(Application app) {
-    Application application = getById(app);
+  private void doStopped(Application appParam) {
+    Application application = getById(appParam);
     application.setOptionState(OptionStateEnum.NONE.getValue());
     application.setState(FlinkAppStateEnum.CANCELED.getValue());
     application.setOptionTime(new Date());
@@ -796,6 +786,18 @@
     } else {
       FlinkAppHttpWatcher.unWatching(application.getId());
     }
+    // kill application
+    if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) {
+      try {
+        List<ApplicationReport> applications =
+            applicationInfoService.getYarnAppReport(application.getJobName());
+        if (!applications.isEmpty()) {
+          YarnClient yarnClient = HadoopUtils.yarnClient();
+          yarnClient.killApplication(applications.get(0).getApplicationId());
+        }
+      } catch (Exception ignored) {
+      }
+    }
   }
 
   private String getSavePointed(Application appParam) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 11732cb..95883a0 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -20,9 +20,11 @@
 import org.apache.streampark.common.Constant;
 import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.FlinkExecutionMode;
 import org.apache.streampark.common.fs.LfsOperator;
 import org.apache.streampark.common.util.ExceptionUtils;
+import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
@@ -51,9 +53,13 @@
 import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.google.common.collect.Sets;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -65,9 +71,11 @@
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Comparator;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.jar.Manifest;
@@ -318,6 +326,45 @@
   }
 
   @Override
+  public AppExistsStateEnum checkStart(Application appParam) {
+    Application application = getById(appParam.getId());
+    if (application == null) {
+      return AppExistsStateEnum.INVALID;
+    }
+    if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
+      boolean exists = !getYarnAppReport(application.getJobName()).isEmpty();
+      return exists ? AppExistsStateEnum.IN_YARN : AppExistsStateEnum.NO;
+    }
+    // todo on k8s check...
+    return AppExistsStateEnum.NO;
+  }
+
+  @Override
+  public List<ApplicationReport> getYarnAppReport(String appName) {
+    try {
+      YarnClient yarnClient = HadoopUtils.yarnClient();
+      Set<String> types =
+          Sets.newHashSet(
+              ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName());
+      EnumSet<YarnApplicationState> states =
+          EnumSet.of(
+              YarnApplicationState.NEW,
+              YarnApplicationState.NEW_SAVING,
+              YarnApplicationState.SUBMITTED,
+              YarnApplicationState.ACCEPTED,
+              YarnApplicationState.RUNNING);
+      Set<String> yarnTag = Sets.newHashSet("streampark");
+      List<ApplicationReport> applications = yarnClient.getApplications(types, states, yarnTag);
+      return applications.stream()
+          .filter(report -> report.getName().equals(appName))
+          .collect(Collectors.toList());
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "getYarnAppReport failed. Ensure that yarn is running properly. ", e);
+    }
+  }
+
+  @Override
   public String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception {
     Application application = getById(id);
     ApiAlertException.throwIfNull(
diff --git a/streampark-console/streampark-console-webapp/src/api/flink/app.ts b/streampark-console/streampark-console-webapp/src/api/flink/app.ts
index 13110db..f32dbda 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app.ts
@@ -40,6 +40,7 @@
   DELETE = '/flink/app/delete',
   DELETE_BAK = '/flink/app/deletebak',
   CREATE = '/flink/app/create',
+  CHECK_START = '/flink/app/check_start',
   START = '/flink/app/start',
   CLEAN = '/flink/app/clean',
   BACKUPS = '/flink/app/backups',
@@ -228,3 +229,7 @@
 export function fetchName(data: { config: string }) {
   return defHttp.post({ url: APP_API.NAME, data });
 }
+
+export function fetchCheckStart(data): Promise<AxiosResponse<number>> {
+  return defHttp.post({ url: APP_API.CHECK_START, data }, { isReturnNativeResponse: true });
+}
diff --git a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
index d4e0ce1..19ba3b1 100644
--- a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
+++ b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue
@@ -63,9 +63,7 @@
       height: 18px;
       background-color: #fff;
       border-radius: 50%;
-      transition:
-        transform 0.5s,
-        background-color 0.5s;
+      transition: transform 0.5s, background-color 0.5s;
       will-change: transform;
     }
 
diff --git a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
index 78cac5c..e08c25f 100644
--- a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
+++ b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue
@@ -179,9 +179,7 @@
     background-color: @component-background;
     border: 1px solid rgb(0 0 0 / 8%);
     border-radius: 0.25rem;
-    box-shadow:
-      0 2px 2px 0 rgb(0 0 0 / 14%),
-      0 3px 1px -2px rgb(0 0 0 / 10%),
+    box-shadow: 0 2px 2px 0 rgb(0 0 0 / 14%), 0 3px 1px -2px rgb(0 0 0 / 10%),
       0 1px 5px 0 rgb(0 0 0 / 6%);
     background-clip: padding-box;
     user-select: none;
diff --git a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
index e5a9dac..1cd7e38 100644
--- a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
+++ b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue
@@ -113,7 +113,7 @@
       });
 
       const getBindValue = computed(
-        () => ({ ...attrs, ...props, ...unref(getProps) }) as Recordable,
+        () => ({ ...attrs, ...props, ...unref(getProps) } as Recordable),
       );
 
       const getSchema = computed((): FormSchema[] => {
diff --git a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
index 8fdbc8f..e89a6ce 100644
--- a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
+++ b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue
@@ -39,9 +39,7 @@
     line-height: 44px;
     background-color: @component-background;
     border-top: 1px solid @border-color-base;
-    box-shadow:
-      0 -6px 16px -8px rgb(0 0 0 / 8%),
-      0 -9px 28px 0 rgb(0 0 0 / 5%),
+    box-shadow: 0 -6px 16px -8px rgb(0 0 0 / 8%), 0 -9px 28px 0 rgb(0 0 0 / 5%),
       0 -12px 48px 16px rgb(0 0 0 / 3%);
     transition: width 0.2s;
 
diff --git a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
index 36ab854..35c0802 100644
--- a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
+++ b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue
@@ -22,7 +22,7 @@
     props: {
       column: {
         type: Object as PropType<BasicColumn>,
-        default: () => ({}) as BasicColumn,
+        default: () => ({} as BasicColumn),
       },
     },
     setup(props) {
diff --git a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index ef46340..b0b2f83 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -226,3 +226,11 @@
   CLAIM = 2,
   LEGACY = 3,
 }
+
+export enum AppExistsEnum {
+  NO = 0,
+  IN_DB = 1,
+  IN_YARN = 2,
+  IN_KUBERNETES = 3,
+  INVALID = 4,
+}
diff --git a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
index 9a66074..c543be9 100644
--- a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
+++ b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts
@@ -32,12 +32,9 @@
     }
     clear();
 
-    timeId = setTimeout(
-      () => {
-        lockPage();
-      },
-      lockTime * 60 * 1000,
-    );
+    timeId = setTimeout(() => {
+      lockPage();
+    }, lockTime * 60 * 1000);
   }
 
   function lockPage(): void {
diff --git a/streampark-console/streampark-console-webapp/src/utils/props.ts b/streampark-console/streampark-console-webapp/src/utils/props.ts
index 368f490..4a15ec4 100644
--- a/streampark-console/streampark-console-webapp/src/utils/props.ts
+++ b/streampark-console/streampark-console-webapp/src/utils/props.ts
@@ -175,7 +175,7 @@
       : never;
   };
 
-export const definePropType = <T>(val: any) => ({ [wrapperKey]: val }) as PropWrapper<T>;
+export const definePropType = <T>(val: any) => ({ [wrapperKey]: val } as PropWrapper<T>);
 
 export const keyOf = <T extends Object>(arr: T) => Object.keys(arr) as Array<keyof T>;
 export const mutable = <T extends readonly any[] | Record<string, unknown>>(val: T) =>
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
index 359f20a..98a0f01 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
@@ -31,8 +31,9 @@
   import { BasicModal, useModalInner } from '/@/components/Modal';
   import { useMessage } from '/@/hooks/web/useMessage';
   import { useRouter } from 'vue-router';
-  import { fetchStart } from '/@/api/flink/app';
-  import { RestoreModeEnum } from '/@/enums/flinkEnum';
+  import { fetchCheckStart, fetchForcedStop, fetchStart } from '/@/api/flink/app';
+
+  import { AppExistsEnum, RestoreModeEnum } from '/@/enums/flinkEnum';
   import { fetchFlinkEnv } from '/@/api/flink/flinkEnv';
   import { renderFlinkAppRestoreMode } from '/@/views/flink/app/hooks/useFlinkRender';
 
@@ -121,8 +122,21 @@
     baseColProps: { span: 24 },
   });
 
-  /* submit */
   async function handleSubmit() {
+    // when then app is building, show forced starting modal
+    const resp = await fetchCheckStart({
+      id: receiveData.application.id,
+    });
+    if (resp.data.data === AppExistsEnum.IN_YARN) {
+      await fetchForcedStop({
+        id: receiveData.application.id,
+      });
+    }
+    await handleDoSubmit();
+  }
+
+  /* submit */
+  async function handleDoSubmit() {
     try {
       const formValue = (await validate()) as Recordable;
       const savePointed = formValue.startSavePointed;