[ISSUE-3066][console-service] Improve streampark-console module base on [3.3 Methods Rule] (#3597)

diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/ApiAlertException.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/ApiAlertException.java
index 03b2945..ecb1d22 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/ApiAlertException.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/ApiAlertException.java
@@ -46,9 +46,21 @@
     super(message, cause, ResponseCode.CODE_FAIL_ALERT);
   }
 
-  public static void throwIfNull(Object object, String errorMessage) {
+  public static void throwIfNull(Object object, String errorMsgFmt, Object... args) {
     if (Objects.isNull(object)) {
-      throw new ApiAlertException(errorMessage);
+      if (args == null || args.length < 1) {
+        throw new ApiAlertException(errorMsgFmt);
+      }
+      throw new ApiAlertException(String.format(errorMsgFmt, args));
+    }
+  }
+
+  public static void throwIfNotNull(Object object, String errorMsgFmt, Object... args) {
+    if (!Objects.isNull(object)) {
+      if (args == null || args.length < 1) {
+        throw new ApiAlertException(errorMsgFmt);
+      }
+      throw new ApiAlertException(String.format(errorMsgFmt, args));
     }
   }
 
@@ -58,9 +70,12 @@
     }
   }
 
-  public static void throwIfTrue(boolean expression, String errorMessage) {
+  public static void throwIfTrue(boolean expression, String errorMsgFmt, Object... args) {
     if (expression) {
-      throw new ApiAlertException(errorMessage);
+      if (args == null || args.length < 1) {
+        throw new ApiAlertException(errorMsgFmt);
+      }
+      throw new ApiAlertException(String.format(errorMsgFmt, args));
     }
   }
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
index 5173724..f3e713e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
@@ -28,6 +28,8 @@
 import org.apache.streampark.console.core.service.application.ApplicationActionService;
 import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
 
+import org.apache.flink.shaded.guava30.com.google.common.base.Preconditions;
+
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import lombok.Data;
@@ -97,41 +99,51 @@
         saveSavepoint(checkPoint, application.getId());
       }
     } else if (shouldProcessFailedTrigger(checkPoint, application.cpFailedTrigger(), status)) {
-      Counter counter = checkPointFailedCache.get(appId);
-      if (counter == null) {
-        checkPointFailedCache.put(appId, new Counter(checkPoint.getTriggerTimestamp()));
-      } else {
-        long minute = counter.getDuration(checkPoint.getTriggerTimestamp());
-        if (minute <= application.getCpFailureRateInterval()
-            && counter.getCount() >= application.getCpMaxFailureInterval()) {
-          checkPointFailedCache.remove(appId);
-          FailoverStrategyEnum failoverStrategyEnum =
-              FailoverStrategyEnum.of(application.getCpFailureAction());
-          if (failoverStrategyEnum == null) {
-            throw new IllegalArgumentException(
-                "Unexpected cpFailureAction: " + application.getCpFailureAction());
-          }
-          switch (failoverStrategyEnum) {
-            case ALERT:
-              alertService.alert(
-                  application.getAlertId(),
-                  AlertTemplate.of(application, CheckPointStatusEnum.FAILED));
-              break;
-            case RESTART:
-              try {
-                applicationActionService.restart(application);
-              } catch (Exception e) {
-                throw new RuntimeException(e);
-              }
-              break;
-            default:
-              // do nothing
-              break;
-          }
-        } else {
-          counter.increment();
+      processFailedCheckpoint(application, checkPoint, appId);
+    }
+  }
+
+  private void processFailedCheckpoint(
+      Application application, @Nonnull CheckPoints.CheckPoint checkPoint, Long appId) {
+    Counter counter = checkPointFailedCache.get(appId);
+    if (counter == null) {
+      checkPointFailedCache.put(appId, new Counter(checkPoint.getTriggerTimestamp()));
+      return;
+    }
+
+    long minute = counter.getDuration(checkPoint.getTriggerTimestamp());
+    if (minute > application.getCpFailureRateInterval()
+        || counter.getCount() < application.getCpMaxFailureInterval()) {
+      counter.increment();
+      return;
+    }
+    checkPointFailedCache.remove(appId);
+    FailoverStrategyEnum failoverStrategyEnum =
+        FailoverStrategyEnum.of(application.getCpFailureAction());
+    Preconditions.checkArgument(
+        failoverStrategyEnum != null,
+        "Unexpected cpFailureAction: %s",
+        application.getCpFailureAction());
+    processFailoverStrategy(application, failoverStrategyEnum);
+  }
+
+  private void processFailoverStrategy(
+      Application application, FailoverStrategyEnum failoverStrategyEnum) {
+    switch (failoverStrategyEnum) {
+      case ALERT:
+        alertService.alert(
+            application.getAlertId(), AlertTemplate.of(application, CheckPointStatusEnum.FAILED));
+        break;
+      case RESTART:
+        try {
+          applicationActionService.restart(application);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
         }
-      }
+        break;
+      default:
+        // do nothing
+        break;
     }
   }
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationBackUp.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationBackUp.java
index 873588c..6a35b1c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationBackUp.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationBackUp.java
@@ -55,6 +55,10 @@
     this.configId = application.getConfigId();
     this.description = application.getBackUpDescription();
     this.createTime = new Date();
+    renderPath(application);
+  }
+
+  private void renderPath(Application application) {
     switch (application.getFlinkExecutionMode()) {
       case KUBERNETES_NATIVE_APPLICATION:
       case KUBERNETES_NATIVE_SESSION:
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
index a504148..970e4f5 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
@@ -31,6 +31,7 @@
 import com.baomidou.mybatisplus.annotation.TableName;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.Nullable;
 
 import java.util.Base64;
 import java.util.Collections;
@@ -77,25 +78,7 @@
   }
 
   public Map<String, String> readConfig() {
-    ConfigFileTypeEnum fileType = ConfigFileTypeEnum.of(this.format);
-    Map<String, String> configs = null;
-    if (fileType != null) {
-      switch (fileType) {
-        case YAML:
-          configs = PropertiesUtils.fromYamlTextAsJava(DeflaterUtils.unzipString(this.content));
-          break;
-        case PROPERTIES:
-          configs =
-              PropertiesUtils.fromPropertiesTextAsJava(DeflaterUtils.unzipString(this.content));
-          break;
-        case HOCON:
-          configs = PropertiesUtils.fromHoconTextAsJava(DeflaterUtils.unzipString(this.content));
-          break;
-        default:
-          configs = Collections.emptyMap();
-          break;
-      }
-    }
+    Map<String, String> configs = renderConfigs();
 
     if (MapUtils.isNotEmpty(configs)) {
       return configs.entrySet().stream()
@@ -120,4 +103,22 @@
     }
     return Collections.emptyMap();
   }
+
+  @Nullable
+  private Map<String, String> renderConfigs() {
+    ConfigFileTypeEnum fileType = ConfigFileTypeEnum.of(this.format);
+    if (fileType == null) {
+      return null;
+    }
+    switch (fileType) {
+      case YAML:
+        return PropertiesUtils.fromYamlTextAsJava(DeflaterUtils.unzipString(this.content));
+      case PROPERTIES:
+        return PropertiesUtils.fromPropertiesTextAsJava(DeflaterUtils.unzipString(this.content));
+      case HOCON:
+        return PropertiesUtils.fromHoconTextAsJava(DeflaterUtils.unzipString(this.content));
+      default:
+        return Collections.emptyMap();
+    }
+  }
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
index 813598f..1005cb3 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
@@ -28,6 +28,7 @@
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.shaded.guava30.com.google.common.base.Preconditions;
 
 import com.baomidou.mybatisplus.annotation.FieldStrategy;
 import com.baomidou.mybatisplus.annotation.IdType;
@@ -39,6 +40,8 @@
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.jgit.lib.Constants;
 
+import javax.annotation.Nonnull;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
@@ -189,9 +192,7 @@
     String mvn = windows ? "mvn.cmd" : "mvn";
 
     String mavenHome = System.getenv("M2_HOME");
-    if (mavenHome == null) {
-      mavenHome = System.getenv("MAVEN_HOME");
-    }
+    mavenHome = mavenHome == null ? System.getenv("MAVEN_HOME") : mavenHome;
 
     boolean useWrapper = true;
     if (mavenHome != null) {
@@ -207,43 +208,46 @@
     }
 
     if (useWrapper) {
-      if (windows) {
-        mvn = WebUtils.getAppHome().concat("/bin/mvnw.cmd");
-      } else {
-        mvn = WebUtils.getAppHome().concat("/bin/mvnw");
-      }
+      mvn = WebUtils.getAppHome().concat(windows ? "/bin/mvnw.cmd" : "/bin/mvnw");
     }
 
-    StringBuilder cmdBuffer = new StringBuilder(mvn).append(" clean package -DskipTests ");
+    return renderCmd(mvn);
+  }
 
+  @Nonnull
+  private String renderCmd(String mvn) {
+    StringBuilder cmdBuffer = new StringBuilder(mvn).append(" clean package -DskipTests ");
+    renderCmdByBuildArgs(cmdBuffer);
+    renderCmdBySetting(cmdBuffer);
+    return cmdBuffer.toString();
+  }
+
+  private void renderCmdByBuildArgs(StringBuilder cmdBuffer) {
     if (StringUtils.isNotBlank(this.buildArgs)) {
       String args = getIllegalArgs(this.buildArgs);
-      if (args != null) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Illegal argument: \"%s\" in maven build parameters: %s", args, this.buildArgs));
-      }
+      Preconditions.checkArgument(
+          args == null,
+          "Illegal argument: \"%s\" in maven build parameters: %s",
+          args,
+          this.buildArgs);
       cmdBuffer.append(this.buildArgs.trim());
     }
+  }
 
+  private void renderCmdBySetting(StringBuilder cmdBuffer) {
     String setting = InternalConfigHolder.get(CommonConfig.MAVEN_SETTINGS_PATH());
-    if (StringUtils.isNotBlank(setting)) {
-      String args = getIllegalArgs(setting);
-      if (args != null) {
-        throw new IllegalArgumentException(
-            String.format("Illegal argument \"%s\" in maven-setting file path: %s", args, setting));
-      }
-      File file = new File(setting);
-      if (file.exists() && file.isFile()) {
-        cmdBuffer.append(" --settings ").append(setting);
-      } else {
-        throw new IllegalArgumentException(
-            String.format(
-                "Invalid maven-setting file path \"%s\", the path not exist or is not file",
-                setting));
-      }
+    if (StringUtils.isBlank(setting)) {
+      return;
     }
-    return cmdBuffer.toString();
+    String args = getIllegalArgs(setting);
+    Preconditions.checkArgument(
+        args == null, "Illegal argument \"%s\" in maven-setting file path: %s", args, setting);
+    File file = new File(setting);
+    Preconditions.checkArgument(
+        !file.exists() || !file.isFile(),
+        "Invalid maven-setting file path \"%s\", the path not exist or is not file",
+        setting);
+    cmdBuffer.append(" --settings ").append(setting);
   }
 
   private String getIllegalArgs(String param) {
@@ -269,11 +273,10 @@
   @JsonIgnore
   public String getMavenWorkHome() {
     String buildHome = this.getAppSource().getAbsolutePath();
-    if (StringUtils.isNotBlank(this.getPom())) {
-      buildHome =
-          new File(buildHome.concat("/").concat(this.getPom())).getParentFile().getAbsolutePath();
+    if (StringUtils.isBlank(this.getPom())) {
+      return buildHome;
     }
-    return buildHome;
+    return new File(buildHome.concat("/").concat(this.getPom())).getParentFile().getAbsolutePath();
   }
 
   @JsonIgnore
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index c338c47..032dc9e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -117,17 +117,8 @@
   }
 
   public synchronized void storageInitialize(StorageType storageType) {
-    String appHome = WebUtils.getAppHome();
 
-    if (StringUtils.isBlank(appHome)) {
-      throw new ExceptionInInitializerError(
-          String.format(
-              "[StreamPark] Workspace path check failed,"
-                  + " The system initialization check failed. If started local for development and debugging,"
-                  + " please ensure the -D%s parameter is clearly specified,"
-                  + " more detail: https://streampark.apache.org/docs/user-guide/deployment",
-              ConfigKeys.KEY_APP_HOME()));
-    }
+    checkAppHome();
 
     if (initialized.contains(storageType)) {
       return;
@@ -137,6 +128,35 @@
     Workspace workspace = Workspace.of(storageType);
 
     // 1. prepare workspace dir
+    prepareWorkspace(storageType, fsOperator, workspace);
+    // 2. upload jar.
+    // 2.1) upload client jar
+    uploadClientJar(workspace, fsOperator);
+    // 2.2) upload plugin jar.
+    uploadPluginJar(workspace, fsOperator);
+    // 2.3) upload shims jar
+    uploadShimsJar(workspace, fsOperator);
+    // 2.4) create maven local repository dir
+    createMvnLocalRepoDir();
+
+    initialized.add(storageType);
+  }
+
+  private static void checkAppHome() {
+    final String appHome = WebUtils.getAppHome();
+    if (StringUtils.isBlank(appHome)) {
+      throw new ExceptionInInitializerError(
+          String.format(
+              "[StreamPark] Workspace path check failed,"
+                  + " The system initialization check failed. If started local for development and debugging,"
+                  + " please ensure the -D%s parameter is clearly specified,"
+                  + " more detail: https://streampark.apache.org/docs/user-guide/deployment",
+              ConfigKeys.KEY_APP_HOME()));
+    }
+  }
+
+  private void prepareWorkspace(
+      StorageType storageType, FsOperator fsOperator, Workspace workspace) {
     if (LFS == storageType) {
       fsOperator.mkdirsIfNotExists(Workspace.APP_LOCAL_DIST());
     }
@@ -148,9 +168,16 @@
             workspace.APP_PYTHON(),
             workspace.APP_JARS())
         .forEach(fsOperator::mkdirsIfNotExists);
+  }
 
-    // 2. upload jar.
-    // 2.1) upload client jar
+  private static void createMvnLocalRepoDir() {
+    String localMavenRepo = Workspace.MAVEN_LOCAL_PATH();
+    if (FsOperator.lfs().exists(localMavenRepo)) {
+      FsOperator.lfs().mkdirs(localMavenRepo);
+    }
+  }
+
+  private void uploadClientJar(Workspace workspace, FsOperator fsOperator) {
     File client = WebUtils.getAppClientDir();
     Utils.required(
         client.exists() && client.listFiles().length > 0,
@@ -163,18 +190,9 @@
       log.info("load client:{} to {}", file.getName(), appClient);
       fsOperator.upload(file.getAbsolutePath(), appClient);
     }
+  }
 
-    // 2.2) upload plugin jar.
-    String appPlugins = workspace.APP_PLUGINS();
-    fsOperator.mkCleanDirs(appPlugins);
-
-    File plugins = WebUtils.getAppPluginsDir();
-    for (File file : plugins.listFiles(fileFilter)) {
-      log.info("load plugin:{} to {}", file.getName(), appPlugins);
-      fsOperator.upload(file.getAbsolutePath(), appPlugins);
-    }
-
-    // 2.3) upload shims jar
+  private void uploadShimsJar(Workspace workspace, FsOperator fsOperator) {
     File[] shims =
         WebUtils.getAppLibDir()
             .listFiles(pathname -> pathname.getName().matches(PATTERN_FLINK_SHIMS_JAR.pattern()));
@@ -193,14 +211,17 @@
         fsOperator.upload(file.getAbsolutePath(), shimsPath);
       }
     }
+  }
 
-    // 2.4) create maven local repository dir
-    String localMavenRepo = Workspace.MAVEN_LOCAL_PATH();
-    if (FsOperator.lfs().exists(localMavenRepo)) {
-      FsOperator.lfs().mkdirs(localMavenRepo);
+  private void uploadPluginJar(Workspace workspace, FsOperator fsOperator) {
+    String appPlugins = workspace.APP_PLUGINS();
+    fsOperator.mkCleanDirs(appPlugins);
+
+    File plugins = WebUtils.getAppPluginsDir();
+    for (File file : plugins.listFiles(fileFilter)) {
+      log.info("load plugin:{} to {}", file.getName(), appPlugins);
+      fsOperator.upload(file.getAbsolutePath(), appPlugins);
     }
-
-    initialized.add(storageType);
   }
 
   public void checkFlinkEnv(StorageType storageType, FlinkEnv flinkEnv) throws IOException {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
index 1175096..87d4e1c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
@@ -33,6 +33,8 @@
 import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
 
+import javax.annotation.Nonnull;
+
 import java.util.List;
 
 @Slf4j
@@ -57,35 +59,7 @@
         return true;
       }
       // No use thread pool, ensure that the alarm can be sent successfully
-      Tuple2<Boolean, AlertException> reduce =
-          alertTypeEnums.stream()
-              .map(
-                  alertTypeEnum -> {
-                    try {
-                      boolean alertRes =
-                          SpringContextUtils.getBean(alertTypeEnum.getClazz())
-                              .doAlert(params, alertTemplate);
-                      return new Tuple2<Boolean, AlertException>(alertRes, null);
-                    } catch (AlertException e) {
-                      return new Tuple2<>(false, e);
-                    }
-                  })
-              .reduce(
-                  new Tuple2<>(true, null),
-                  (tp1, tp2) -> {
-                    boolean alertResult = tp1.f0 & tp2.f0;
-                    if (tp1.f1 == null && tp2.f1 == null) {
-                      return new Tuple2<>(tp1.f0 & tp2.f0, null);
-                    }
-                    if (tp1.f1 != null && tp2.f1 != null) {
-                      // merge multiple exception, and keep the details of the first exception
-                      AlertException alertException =
-                          new AlertException(
-                              tp1.f1.getMessage() + "\n" + tp2.f1.getMessage(), tp1.f1);
-                      return new Tuple2<>(alertResult, alertException);
-                    }
-                    return new Tuple2<>(alertResult, tp1.f1 == null ? tp2.f1 : tp1.f1);
-                  });
+      Tuple2<Boolean, AlertException> reduce = triggerAlert(alertTemplate, alertTypeEnums, params);
       if (reduce.f1 != null) {
         throw reduce.f1;
       }
@@ -96,4 +70,36 @@
     }
     return false;
   }
+
+  @Nonnull
+  private Tuple2<Boolean, AlertException> triggerAlert(
+      AlertTemplate alertTemplate, List<AlertTypeEnum> alertTypeEnums, AlertConfigParams params) {
+    return alertTypeEnums.stream()
+        .map(
+            alertTypeEnum -> {
+              try {
+                boolean alertRes =
+                    SpringContextUtils.getBean(alertTypeEnum.getClazz())
+                        .doAlert(params, alertTemplate);
+                return new Tuple2<Boolean, AlertException>(alertRes, null);
+              } catch (AlertException e) {
+                return new Tuple2<>(false, e);
+              }
+            })
+        .reduce(
+            new Tuple2<>(true, null),
+            (tp1, tp2) -> {
+              boolean alertResult = tp1.f0 & tp2.f0;
+              if (tp1.f1 == null && tp2.f1 == null) {
+                return new Tuple2<>(tp1.f0 & tp2.f0, null);
+              }
+              if (tp1.f1 != null && tp2.f1 != null) {
+                // merge multiple exception, and keep the details of the first exception
+                AlertException alertException =
+                    new AlertException(tp1.f1.getMessage() + "\n" + tp2.f1.getMessage(), tp1.f1);
+                return new Tuple2<>(alertResult, alertException);
+              }
+              return new Tuple2<>(alertResult, tp1.f1 == null ? tp2.f1 : tp1.f1);
+            });
+  }
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java
index 9016d7a..38b24dd 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java
@@ -37,6 +37,7 @@
 import org.springframework.util.StringUtils;
 import org.springframework.web.client.RestTemplate;
 
+import javax.annotation.Nonnull;
 import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
 
@@ -73,29 +74,14 @@
       if (StringUtils.hasLength(contacts)) {
         Collections.addAll(contactList, contacts.split(","));
       }
-      String title = alertTemplate.getTitle();
-      if (!contactList.isEmpty()) {
-        StringJoiner joiner = new StringJoiner(",@", title + " @", "");
-        contactList.forEach(joiner::add);
-        title = joiner.toString();
-      }
-      Map<String, Object> contactMap = new HashMap<>();
-      contactMap.put("atMobiles", contactList);
-      contactMap.put("isAtAll", BooleanUtils.toBoolean(dingTalkParams.getIsAtAll()));
-
+      String title = renderTitle(alertTemplate, contactList);
+      Map<String, Object> contactMap = renderContact(contactList, dingTalkParams);
       // format markdown
       String markdown = FreemarkerUtils.format(template, alertTemplate);
+      Map<String, String> contentMap = renderContent(title, markdown);
+      Map<String, Object> bodyMap = renderBody(contentMap, contactMap);
 
-      Map<String, String> content = new HashMap<>();
-      content.put("title", title);
-      content.put("text", markdown);
-
-      Map<String, Object> body = new HashMap<>();
-      body.put("msgtype", "markdown");
-      body.put("markdown", content);
-      body.put("at", contactMap);
-
-      sendMessage(dingTalkParams, body);
+      sendMessage(dingTalkParams, bodyMap);
       return true;
     } catch (AlertException alertException) {
       throw alertException;
@@ -104,6 +90,43 @@
     }
   }
 
+  @Nonnull
+  private Map<String, Object> renderBody(
+      Map<String, String> content, Map<String, Object> contactMap) {
+    Map<String, Object> body = new HashMap<>();
+    body.put("msgtype", "markdown");
+    body.put("markdown", content);
+    body.put("at", contactMap);
+    return body;
+  }
+
+  @Nonnull
+  private Map<String, String> renderContent(String title, String markdown) {
+    Map<String, String> content = new HashMap<>();
+    content.put("title", title);
+    content.put("text", markdown);
+    return content;
+  }
+
+  @Nonnull
+  private Map<String, Object> renderContact(
+      List<String> contactList, AlertDingTalkParams dingTalkParams) {
+    Map<String, Object> contactMap = new HashMap<>();
+    contactMap.put("atMobiles", contactList);
+    contactMap.put("isAtAll", BooleanUtils.toBoolean(dingTalkParams.getIsAtAll()));
+    return contactMap;
+  }
+
+  private String renderTitle(AlertTemplate alertTemplate, List<String> contactList) {
+    String title = alertTemplate.getTitle();
+    if (!contactList.isEmpty()) {
+      StringJoiner joiner = new StringJoiner(",@", title + " @", "");
+      contactList.forEach(joiner::add);
+      title = joiner.toString();
+    }
+    return title;
+  }
+
   private void sendMessage(AlertDingTalkParams params, Map<String, Object> body)
       throws AlertException {
     // get webhook url
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java
index 7568f9d..2686ed2 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java
@@ -41,6 +41,8 @@
 import org.springframework.web.client.ResponseExtractor;
 import org.springframework.web.client.RestTemplate;
 
+import javax.annotation.Nonnull;
+
 import java.util.Map;
 
 @Slf4j
@@ -77,24 +79,7 @@
   private void sendMessage(AlertHttpCallbackParams params, Map<String, Object> body)
       throws AlertException {
     String url = params.getUrl();
-    HttpHeaders headers = new HttpHeaders();
-    String contentType = params.getContentType();
-    MediaType mediaType = MediaType.APPLICATION_JSON;
-    if (StringUtils.hasLength(contentType)) {
-      switch (contentType.toLowerCase()) {
-        case MediaType.APPLICATION_FORM_URLENCODED_VALUE:
-          mediaType = MediaType.APPLICATION_FORM_URLENCODED;
-          break;
-        case MediaType.MULTIPART_FORM_DATA_VALUE:
-          mediaType = MediaType.MULTIPART_FORM_DATA;
-          break;
-        case MediaType.APPLICATION_JSON_VALUE:
-        default:
-          break;
-      }
-    }
-    headers.setContentType(mediaType);
-
+    HttpHeaders headers = getHttpHeaders(params);
     ResponseEntity<Object> response;
     try {
       HttpMethod httpMethod = HttpMethod.POST;
@@ -119,4 +104,26 @@
       throw new AlertException(String.format("Failed to request httpCallback alert,%nurl:%s", url));
     }
   }
+
+  @Nonnull
+  private HttpHeaders getHttpHeaders(AlertHttpCallbackParams params) {
+    HttpHeaders headers = new HttpHeaders();
+    String contentType = params.getContentType();
+    MediaType mediaType = MediaType.APPLICATION_JSON;
+    if (StringUtils.hasLength(contentType)) {
+      switch (contentType.toLowerCase()) {
+        case MediaType.APPLICATION_FORM_URLENCODED_VALUE:
+          mediaType = MediaType.APPLICATION_FORM_URLENCODED;
+          break;
+        case MediaType.MULTIPART_FORM_DATA_VALUE:
+          mediaType = MediaType.MULTIPART_FORM_DATA;
+          break;
+        case MediaType.APPLICATION_JSON_VALUE:
+        default:
+          break;
+      }
+    }
+    headers.setContentType(mediaType);
+    return headers;
+  }
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java
index 248c6b4..86382c5 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java
@@ -37,6 +37,7 @@
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
 
+import javax.annotation.Nonnull;
 import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
 
@@ -73,17 +74,7 @@
       String markdown = FreemarkerUtils.format(template, alertTemplate);
       Map<String, Object> cardMap =
           mapper.readValue(markdown, new TypeReference<Map<String, Object>>() {});
-
-      Map<String, Object> body = new HashMap<>();
-      // get sign
-      if (alertLarkParams.getSecretEnable()) {
-        long timestamp = System.currentTimeMillis() / 1000;
-        String sign = getSign(alertLarkParams.getSecretToken(), timestamp);
-        body.put("timestamp", timestamp);
-        body.put("sign", sign);
-      }
-      body.put("msg_type", "interactive");
-      body.put("card", cardMap);
+      Map<String, Object> body = renderBody(alertLarkParams, cardMap);
       sendMessage(alertLarkParams, body);
       return true;
     } catch (AlertException alertException) {
@@ -93,6 +84,22 @@
     }
   }
 
+  @Nonnull
+  private Map<String, Object> renderBody(
+      AlertLarkParams alertLarkParams, Map<String, Object> cardMap) {
+    Map<String, Object> body = new HashMap<>();
+    // get sign
+    if (alertLarkParams.getSecretEnable()) {
+      long timestamp = System.currentTimeMillis() / 1000;
+      String sign = getSign(alertLarkParams.getSecretToken(), timestamp);
+      body.put("timestamp", timestamp);
+      body.put("sign", sign);
+    }
+    body.put("msg_type", "interactive");
+    body.put("card", cardMap);
+    return body;
+  }
+
   private void sendMessage(AlertLarkParams params, Map<String, Object> body) throws AlertException {
     // get webhook url
     String url = getWebhook(params);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 70f0231..2974faa 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -99,6 +99,7 @@
 import com.google.common.collect.Sets;
 import io.fabric8.kubernetes.client.KubernetesClientException;
 import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
@@ -390,7 +391,6 @@
     }
 
     if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) {
-
       ApiAlertException.throwIfTrue(
           !applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(),
           "[StreamPark] The same task name is already running in the yarn queue");
@@ -400,7 +400,6 @@
     Utils.requireNotNull(buildPipeline);
 
     FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
-
     ApiAlertException.throwIfNull(flinkEnv, "[StreamPark] can no found flink version");
 
     // if manually started, clear the restart flag
@@ -413,17 +412,9 @@
       appParam.setSavePointed(true);
       application.setRestartCount(application.getRestartCount() + 1);
     }
-
     // 2) update app state to starting...
     starting(application);
-
-    String jobId = new JobID().toHexString();
-    ApplicationLog applicationLog = new ApplicationLog();
-    applicationLog.setOptionName(OperationEnum.START.getValue());
-    applicationLog.setAppId(application.getId());
-    applicationLog.setOptionTime(new Date());
-    applicationLog.setUserId(commonService.getUserId());
-
+    ApplicationLog applicationLog = constructAppLog(application);
     // set the latest to Effective, (it will only become the current effective at this time)
     applicationManageService.toEffective(application);
 
@@ -468,7 +459,7 @@
             flinkEnv.getFlinkConf(),
             FlinkDevelopmentMode.of(application.getJobType()),
             application.getId(),
-            jobId,
+            new JobID().toHexString(),
             application.getJobName(),
             appConf,
             application.getApplicationType(),
@@ -491,85 +482,114 @@
         (response, throwable) -> {
           // 1) remove Future
           startFutureMap.remove(application.getId());
-
           // 2) exception
           if (throwable != null) {
-            String exception = ExceptionUtils.stringifyException(throwable);
-            applicationLog.setException(exception);
-            applicationLog.setSuccess(false);
-            applicationLogService.save(applicationLog);
-            if (throwable instanceof CancellationException) {
-              doStopped(application.getId());
-            } else {
-              Application app = getById(appParam.getId());
-              app.setState(FlinkAppStateEnum.FAILED.getValue());
-              app.setOptionState(OptionStateEnum.NONE.getValue());
-              updateById(app);
-              if (isKubernetesApp(app)) {
-                k8SFlinkTrackMonitor.unWatching(toTrackId(app));
-              } else {
-                FlinkAppHttpWatcher.unWatching(appParam.getId());
-              }
-            }
+            processForException(appParam, throwable, applicationLog, application);
             return;
           }
-
           // 3) success
-          applicationLog.setSuccess(true);
-          if (response.flinkConfig() != null) {
-            String jmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
-            if (jmMemory != null) {
-              application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
-            }
-            String tmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
-            if (tmMemory != null) {
-              application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
-            }
-          }
-          application.setAppId(response.clusterId());
-          if (StringUtils.isNoneEmpty(response.jobId())) {
-            application.setJobId(response.jobId());
-          }
-
-          if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
-            application.setJobManagerUrl(response.jobManagerUrl());
-            applicationLog.setJobManagerUrl(response.jobManagerUrl());
-          }
-          applicationLog.setYarnAppId(response.clusterId());
-          application.setStartTime(new Date());
-          application.setEndTime(null);
-
-          // if start completed, will be added task to tracking queue
-          if (isKubernetesApp(application)) {
-            application.setRelease(ReleaseStateEnum.DONE.get());
-            k8SFlinkTrackMonitor.doWatching(toTrackId(application));
-            if (FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
-              String domainName = settingService.getIngressModeDefault();
-              if (StringUtils.isNotBlank(domainName)) {
-                try {
-                  IngressController.configureIngress(
-                      domainName, application.getClusterId(), application.getK8sNamespace());
-                } catch (KubernetesClientException e) {
-                  log.info("Failed to create ingress, stack info:{}", e.getMessage());
-                  applicationLog.setException(e.getMessage());
-                  applicationLog.setSuccess(false);
-                  applicationLogService.save(applicationLog);
-                  application.setState(FlinkAppStateEnum.FAILED.getValue());
-                  application.setOptionState(OptionStateEnum.NONE.getValue());
-                }
-              }
-            }
-          } else {
-            FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING);
-            FlinkAppHttpWatcher.doWatching(application);
-          }
-          // update app
-          updateById(application);
-          // save log
-          applicationLogService.save(applicationLog);
+          processForSuccess(appParam, response, applicationLog, application);
         });
   }
 
+  @NotNull
+  private ApplicationLog constructAppLog(Application application) {
+    ApplicationLog applicationLog = new ApplicationLog();
+    applicationLog.setOptionName(OperationEnum.START.getValue());
+    applicationLog.setAppId(application.getId());
+    applicationLog.setOptionTime(new Date());
+    applicationLog.setUserId(commonService.getUserId());
+    return applicationLog;
+  }
+
+  private void processForSuccess(
+      Application appParam,
+      SubmitResponse response,
+      ApplicationLog applicationLog,
+      Application application) {
+    applicationLog.setSuccess(true);
+    if (response.flinkConfig() != null) {
+      String jmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
+      if (jmMemory != null) {
+        application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
+      }
+      String tmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
+      if (tmMemory != null) {
+        application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
+      }
+    }
+    application.setAppId(response.clusterId());
+    if (StringUtils.isNoneEmpty(response.jobId())) {
+      application.setJobId(response.jobId());
+    }
+
+    if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
+      application.setJobManagerUrl(response.jobManagerUrl());
+      applicationLog.setJobManagerUrl(response.jobManagerUrl());
+    }
+    applicationLog.setYarnAppId(response.clusterId());
+    application.setStartTime(new Date());
+    application.setEndTime(null);
+
+    // if start completed, will be added task to tracking queue
+    if (isKubernetesApp(application)) {
+      processForK8sApp(application, applicationLog);
+    } else {
+      FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING);
+      FlinkAppHttpWatcher.doWatching(application);
+    }
+    // update app
+    updateById(application);
+    // save log
+    applicationLogService.save(applicationLog);
+  }
+
+  private void processForK8sApp(Application application, ApplicationLog applicationLog) {
+    application.setRelease(ReleaseStateEnum.DONE.get());
+    k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+    if (!FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
+      return;
+    }
+    String domainName = settingService.getIngressModeDefault();
+    if (StringUtils.isNotBlank(domainName)) {
+      try {
+        IngressController.configureIngress(
+            domainName, application.getClusterId(), application.getK8sNamespace());
+      } catch (KubernetesClientException e) {
+        log.info("Failed to create ingress, stack info:{}", e.getMessage());
+        applicationLog.setException(e.getMessage());
+        applicationLog.setSuccess(false);
+        applicationLogService.save(applicationLog);
+        application.setState(FlinkAppStateEnum.FAILED.getValue());
+        application.setOptionState(OptionStateEnum.NONE.getValue());
+      }
+    }
+  }
+
+  private void processForException(
+      Application appParam,
+      Throwable throwable,
+      ApplicationLog applicationLog,
+      Application application) {
+    String exception = ExceptionUtils.stringifyException(throwable);
+    applicationLog.setException(exception);
+    applicationLog.setSuccess(false);
+    applicationLogService.save(applicationLog);
+    if (throwable instanceof CancellationException) {
+      doStopped(application.getId());
+    } else {
+      Application app = getById(appParam.getId());
+      app.setState(FlinkAppStateEnum.FAILED.getValue());
+      app.setOptionState(OptionStateEnum.NONE.getValue());
+      updateById(app);
+      if (isKubernetesApp(app)) {
+        k8SFlinkTrackMonitor.unWatching(toTrackId(app));
+      } else {
+        FlinkAppHttpWatcher.unWatching(appParam.getId());
+      }
+    }
+  }
+
   /**
    * Check whether a job with the same name is running in the yarn queue
    *
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 9652666..2a59bad 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -64,6 +64,8 @@
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.Nonnull;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
@@ -147,26 +149,15 @@
       }
       JobsOverview.Task task = app.getOverview();
       if (task != null) {
-        overview.setTotal(overview.getTotal() + task.getTotal());
-        overview.setCreated(overview.getCreated() + task.getCreated());
-        overview.setScheduled(overview.getScheduled() + task.getScheduled());
-        overview.setDeploying(overview.getDeploying() + task.getDeploying());
-        overview.setRunning(overview.getRunning() + task.getRunning());
-        overview.setFinished(overview.getFinished() + task.getFinished());
-        overview.setCanceling(overview.getCanceling() + task.getCanceling());
-        overview.setCanceled(overview.getCanceled() + task.getCanceled());
-        overview.setFailed(overview.getFailed() + task.getFailed());
-        overview.setReconciling(overview.getReconciling() + task.getReconciling());
+        renderJobsOverviewTaskByTask(overview, task);
       }
     }
 
     // merge metrics from flink kubernetes cluster
-    FlinkMetricCV k8sMetric;
-    if (K8sFlinkConfig.isV2Enabled()) {
-      k8sMetric = flinkK8sObserver.getAggClusterMetricCV(teamId);
-    } else {
-      k8sMetric = k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
-    }
+    FlinkMetricCV k8sMetric =
+        K8sFlinkConfig.isV2Enabled()
+            ? flinkK8sObserver.getAggClusterMetricCV(teamId)
+            : k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
     if (k8sMetric != null) {
       totalJmMemory += k8sMetric.totalJmMemory();
       totalTmMemory += k8sMetric.totalTmMemory();
@@ -174,14 +165,45 @@
       totalSlot += k8sMetric.totalSlot();
       availableSlot += k8sMetric.availableSlot();
       runningJob += k8sMetric.runningJob();
-      overview.setTotal(overview.getTotal() + k8sMetric.totalJob());
-      overview.setRunning(overview.getRunning() + k8sMetric.runningJob());
-      overview.setFinished(overview.getFinished() + k8sMetric.finishedJob());
-      overview.setCanceled(overview.getCanceled() + k8sMetric.cancelledJob());
-      overview.setFailed(overview.getFailed() + k8sMetric.failedJob());
+      renderJobsOverviewTaskByK8sMetric(overview, k8sMetric);
     }
 
     // result json
+    return constructDashboardMap(
+        overview, totalJmMemory, totalTmMemory, totalTm, availableSlot, totalSlot, runningJob);
+  }
+
+  private void renderJobsOverviewTaskByTask(JobsOverview.Task overview, JobsOverview.Task task) {
+    overview.setTotal(overview.getTotal() + task.getTotal());
+    overview.setCreated(overview.getCreated() + task.getCreated());
+    overview.setScheduled(overview.getScheduled() + task.getScheduled());
+    overview.setDeploying(overview.getDeploying() + task.getDeploying());
+    overview.setRunning(overview.getRunning() + task.getRunning());
+    overview.setFinished(overview.getFinished() + task.getFinished());
+    overview.setCanceling(overview.getCanceling() + task.getCanceling());
+    overview.setCanceled(overview.getCanceled() + task.getCanceled());
+    overview.setFailed(overview.getFailed() + task.getFailed());
+    overview.setReconciling(overview.getReconciling() + task.getReconciling());
+  }
+
+  private void renderJobsOverviewTaskByK8sMetric(
+      JobsOverview.Task overview, FlinkMetricCV k8sMetric) {
+    overview.setTotal(overview.getTotal() + k8sMetric.totalJob());
+    overview.setRunning(overview.getRunning() + k8sMetric.runningJob());
+    overview.setFinished(overview.getFinished() + k8sMetric.finishedJob());
+    overview.setCanceled(overview.getCanceled() + k8sMetric.cancelledJob());
+    overview.setFailed(overview.getFailed() + k8sMetric.failedJob());
+  }
+
+  @Nonnull
+  private Map<String, Serializable> constructDashboardMap(
+      JobsOverview.Task overview,
+      Integer totalJmMemory,
+      Integer totalTmMemory,
+      Integer totalTm,
+      Integer availableSlot,
+      Integer totalSlot,
+      Integer runningJob) {
     Map<String, Serializable> dashboardDataMap = new HashMap<>(8);
     dashboardDataMap.put("task", overview);
     dashboardDataMap.put("jmMemory", totalJmMemory);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 102e073..0fce782 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -275,29 +275,12 @@
                     // set duration
                     String restUrl = k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(record));
                     record.setFlinkRestUrl(restUrl);
-                    if (record.getTracking() == 1
-                        && record.getStartTime() != null
-                        && record.getStartTime().getTime() > 0) {
-                      record.setDuration(now - record.getStartTime().getTime());
-                    }
+                    setAppDurationIfNeeded(record, now);
                   }
                   if (pipeStates.containsKey(record.getId())) {
                     record.setBuildStatus(pipeStates.get(record.getId()).getCode());
                   }
-
-                  AppControl appControl =
-                      new AppControl()
-                          .setAllowBuild(
-                              record.getBuildStatus() == null
-                                  || !PipelineStatusEnum.running
-                                      .getCode()
-                                      .equals(record.getBuildStatus()))
-                          .setAllowStart(
-                              !record.shouldTracking()
-                                  && PipelineStatusEnum.success
-                                      .getCode()
-                                      .equals(record.getBuildStatus()))
-                          .setAllowStop(record.isRunning());
+                  AppControl appControl = getAppControl(record);
                   record.setAppControl(appControl);
                 })
             .collect(Collectors.toList());
@@ -305,6 +288,25 @@
     return page;
   }
 
+  private void setAppDurationIfNeeded(Application record, long now) {
+    if (record.getTracking() == 1
+        && record.getStartTime() != null
+        && record.getStartTime().getTime() > 0) {
+      record.setDuration(now - record.getStartTime().getTime());
+    }
+  }
+
+  private AppControl getAppControl(Application record) {
+    return new AppControl()
+        .setAllowBuild(
+            record.getBuildStatus() == null
+                || !PipelineStatusEnum.running.getCode().equals(record.getBuildStatus()))
+        .setAllowStart(
+            !record.shouldTracking()
+                && PipelineStatusEnum.success.getCode().equals(record.getBuildStatus()))
+        .setAllowStop(record.isRunning());
+  }
+
   @Override
   public void changeOwnership(Long userId, Long targetUserId) {
     LambdaUpdateWrapper<Application> updateWrapper =
@@ -792,11 +794,7 @@
 
       // set duration
       long now = System.currentTimeMillis();
-      if (application.getTracking() == 1
-          && application.getStartTime() != null
-          && application.getStartTime().getTime() > 0) {
-        application.setDuration(now - application.getStartTime().getTime());
-      }
+      setAppDurationIfNeeded(application, now);
     }
 
     application.setYarnQueueByHotParams();
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index a43e1fe..37c09e9 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -168,16 +168,12 @@
    * @return Whether the pipeline was successfully started
    */
   @Override
-  public boolean buildApplication(@NotNull Long appId, boolean forceBuild) {
+  public boolean buildApplication(@Nonnull Long appId, boolean forceBuild) {
     // check the build environment
     checkBuildEnv(appId, forceBuild);
 
     Application app = applicationManageService.getById(appId);
-    ApplicationLog applicationLog = new ApplicationLog();
-    applicationLog.setOptionName(RELEASE.getValue());
-    applicationLog.setAppId(app.getId());
-    applicationLog.setOptionTime(new Date());
-    applicationLog.setUserId(commonService.getUserId());
+    ApplicationLog applicationLog = getApplicationLog(app);
 
     // check if you need to go through the build process (if the jar and pom have changed,
     // you need to go through the build process, if other common parameters are modified,
@@ -361,25 +357,7 @@
         });
     // save docker resolve progress detail to cache, only for flink-k8s application mode.
     if (PipelineTypeEnum.FLINK_NATIVE_K8S_APPLICATION == pipeline.pipeType()) {
-      pipeline
-          .as(FlinkK8sApplicationBuildPipeline.class)
-          .registerDockerProgressWatcher(
-              new DockerProgressWatcher() {
-                @Override
-                public void onDockerPullProgressChange(DockerPullSnapshot snapshot) {
-                  DOCKER_PULL_PG_SNAPSHOTS.put(app.getId(), snapshot);
-                }
-
-                @Override
-                public void onDockerBuildProgressChange(DockerBuildSnapshot snapshot) {
-                  DOCKER_BUILD_PG_SNAPSHOTS.put(app.getId(), snapshot);
-                }
-
-                @Override
-                public void onDockerPushProgressChange(DockerPushSnapshot snapshot) {
-                  DOCKER_PUSH_PG_SNAPSHOTS.put(app.getId(), snapshot);
-                }
-              });
+      registerDockerProgressWatcher(pipeline, app);
     }
     // save pipeline instance snapshot to db before release it.
     AppBuildPipeline buildPipeline =
@@ -393,6 +371,38 @@
     return saved;
   }
 
+  private void registerDockerProgressWatcher(BuildPipeline pipeline, Application app) {
+    pipeline
+        .as(FlinkK8sApplicationBuildPipeline.class)
+        .registerDockerProgressWatcher(
+            new DockerProgressWatcher() {
+              @Override
+              public void onDockerPullProgressChange(DockerPullSnapshot snapshot) {
+                DOCKER_PULL_PG_SNAPSHOTS.put(app.getId(), snapshot);
+              }
+
+              @Override
+              public void onDockerBuildProgressChange(DockerBuildSnapshot snapshot) {
+                DOCKER_BUILD_PG_SNAPSHOTS.put(app.getId(), snapshot);
+              }
+
+              @Override
+              public void onDockerPushProgressChange(DockerPushSnapshot snapshot) {
+                DOCKER_PUSH_PG_SNAPSHOTS.put(app.getId(), snapshot);
+              }
+            });
+  }
+
+  @Nonnull
+  private ApplicationLog getApplicationLog(Application app) {
+    ApplicationLog applicationLog = new ApplicationLog();
+    applicationLog.setOptionName(RELEASE.getValue());
+    applicationLog.setAppId(app.getId());
+    applicationLog.setOptionTime(new Date());
+    applicationLog.setUserId(commonService.getUserId());
+    return applicationLog;
+  }
+
   /**
    * check the build environment
    *
@@ -443,69 +453,26 @@
           localWorkspace = app.getLocalAppHome();
         }
         FlinkYarnApplicationBuildRequest yarnAppRequest =
-            new FlinkYarnApplicationBuildRequest(
-                app.getJobName(),
-                mainClass,
-                localWorkspace,
-                yarnProvidedPath,
-                app.getDevelopmentMode(),
-                getMergedDependencyInfo(app));
+            buildFlinkYarnApplicationBuildRequest(app, mainClass, localWorkspace, yarnProvidedPath);
         log.info("Submit params to building pipeline : {}", yarnAppRequest);
         return FlinkYarnApplicationBuildPipeline.of(yarnAppRequest);
       case YARN_PER_JOB:
       case YARN_SESSION:
       case REMOTE:
         FlinkRemotePerJobBuildRequest buildRequest =
-            new FlinkRemotePerJobBuildRequest(
-                app.getJobName(),
-                app.getLocalAppHome(),
-                mainClass,
-                flinkUserJar,
-                app.isCustomCodeJob(),
-                app.getFlinkExecutionMode(),
-                app.getDevelopmentMode(),
-                flinkEnv.getFlinkVersion(),
-                getMergedDependencyInfo(app));
+            buildFlinkRemotePerJobBuildRequest(app, mainClass, flinkUserJar, flinkEnv);
         log.info("Submit params to building pipeline : {}", buildRequest);
         return FlinkRemoteBuildPipeline.of(buildRequest);
       case KUBERNETES_NATIVE_SESSION:
         FlinkK8sSessionBuildRequest k8sSessionBuildRequest =
-            new FlinkK8sSessionBuildRequest(
-                app.getJobName(),
-                app.getLocalAppHome(),
-                mainClass,
-                flinkUserJar,
-                app.getFlinkExecutionMode(),
-                app.getDevelopmentMode(),
-                flinkEnv.getFlinkVersion(),
-                getMergedDependencyInfo(app),
-                app.getClusterId(),
-                app.getK8sNamespace());
+            buildFlinkK8sSessionBuildRequest(app, mainClass, flinkUserJar, flinkEnv);
         log.info("Submit params to building pipeline : {}", k8sSessionBuildRequest);
         return FlinkK8sSessionBuildPipeline.of(k8sSessionBuildRequest);
       case KUBERNETES_NATIVE_APPLICATION:
         DockerConfig dockerConfig = settingService.getDockerConfig();
         FlinkK8sApplicationBuildRequest k8sApplicationBuildRequest =
-            new FlinkK8sApplicationBuildRequest(
-                app.getJobName(),
-                app.getLocalAppHome(),
-                mainClass,
-                flinkUserJar,
-                app.getFlinkExecutionMode(),
-                app.getDevelopmentMode(),
-                flinkEnv.getFlinkVersion(),
-                getMergedDependencyInfo(app),
-                app.getClusterId(),
-                app.getK8sNamespace(),
-                app.getFlinkImage(),
-                app.getK8sPodTemplates(),
-                app.getK8sHadoopIntegration() != null ? app.getK8sHadoopIntegration() : false,
-                DockerConf.of(
-                    dockerConfig.getAddress(),
-                    dockerConfig.getNamespace(),
-                    dockerConfig.getUser(),
-                    dockerConfig.getPassword()),
-                app.getIngressTemplate());
+            buildFlinkK8sApplicationBuildRequest(
+                app, mainClass, flinkUserJar, flinkEnv, dockerConfig);
         log.info("Submit params to building pipeline : {}", k8sApplicationBuildRequest);
         if (K8sFlinkConfig.isV2Enabled()) {
           return FlinkK8sApplicationBuildPipelineV2.of(k8sApplicationBuildRequest);
@@ -517,6 +484,84 @@
     }
   }
 
+  @NotNull
+  private FlinkYarnApplicationBuildRequest buildFlinkYarnApplicationBuildRequest(
+      @NotNull Application app, String mainClass, String localWorkspace, String yarnProvidedPath) {
+    FlinkYarnApplicationBuildRequest yarnAppRequest =
+        new FlinkYarnApplicationBuildRequest(
+            app.getJobName(),
+            mainClass,
+            localWorkspace,
+            yarnProvidedPath,
+            app.getDevelopmentMode(),
+            getMergedDependencyInfo(app));
+    return yarnAppRequest;
+  }
+
+  @Nonnull
+  private FlinkK8sApplicationBuildRequest buildFlinkK8sApplicationBuildRequest(
+      @Nonnull Application app,
+      String mainClass,
+      String flinkUserJar,
+      FlinkEnv flinkEnv,
+      DockerConfig dockerConfig) {
+    FlinkK8sApplicationBuildRequest k8sApplicationBuildRequest =
+        new FlinkK8sApplicationBuildRequest(
+            app.getJobName(),
+            app.getLocalAppHome(),
+            mainClass,
+            flinkUserJar,
+            app.getFlinkExecutionMode(),
+            app.getDevelopmentMode(),
+            flinkEnv.getFlinkVersion(),
+            getMergedDependencyInfo(app),
+            app.getClusterId(),
+            app.getK8sNamespace(),
+            app.getFlinkImage(),
+            app.getK8sPodTemplates(),
+            app.getK8sHadoopIntegration() != null ? app.getK8sHadoopIntegration() : false,
+            DockerConf.of(
+                dockerConfig.getAddress(),
+                dockerConfig.getNamespace(),
+                dockerConfig.getUser(),
+                dockerConfig.getPassword()),
+            app.getIngressTemplate());
+    return k8sApplicationBuildRequest;
+  }
+
+  @Nonnull
+  private FlinkK8sSessionBuildRequest buildFlinkK8sSessionBuildRequest(
+      @Nonnull Application app, String mainClass, String flinkUserJar, FlinkEnv flinkEnv) {
+    FlinkK8sSessionBuildRequest k8sSessionBuildRequest =
+        new FlinkK8sSessionBuildRequest(
+            app.getJobName(),
+            app.getLocalAppHome(),
+            mainClass,
+            flinkUserJar,
+            app.getFlinkExecutionMode(),
+            app.getDevelopmentMode(),
+            flinkEnv.getFlinkVersion(),
+            getMergedDependencyInfo(app),
+            app.getClusterId(),
+            app.getK8sNamespace());
+    return k8sSessionBuildRequest;
+  }
+
+  @Nonnull
+  private FlinkRemotePerJobBuildRequest buildFlinkRemotePerJobBuildRequest(
+      @Nonnull Application app, String mainClass, String flinkUserJar, FlinkEnv flinkEnv) {
+    return new FlinkRemotePerJobBuildRequest(
+        app.getJobName(),
+        app.getLocalAppHome(),
+        mainClass,
+        flinkUserJar,
+        app.isCustomCodeJob(),
+        app.getFlinkExecutionMode(),
+        app.getDevelopmentMode(),
+        flinkEnv.getFlinkVersion(),
+        getMergedDependencyInfo(app));
+  }
+
   /** copy from {@link ApplicationActionService#start(Application, boolean)} */
   private String retrieveFlinkUserJar(FlinkEnv flinkEnv, Application app) {
     switch (app.getDevelopmentMode()) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
index 76220dd..d6d3c90 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
@@ -102,60 +102,70 @@
     // flink sql job
     ApplicationConfig latestConfig = getLatest(appParam.getId());
     if (appParam.isFlinkSqlJob()) {
-      // get effect config
-      ApplicationConfig effectiveConfig = getEffective(appParam.getId());
-      if (Utils.isEmpty(appParam.getConfig())) {
-        if (effectiveConfig != null) {
-          effectiveService.remove(appParam.getId(), EffectiveTypeEnum.CONFIG);
+      updateForFlinkSqlJob(appParam, latest, latestConfig);
+    } else {
+      updateForNonFlinkSqlJob(appParam, latest, latestConfig);
+    }
+  }
+
+  private void updateForNonFlinkSqlJob(
+      Application appParam, Boolean latest, ApplicationConfig latestConfig) {
+    // may be re-selected a config file (without config id), or may be based on an original edit
+    // (with config Id).
+    Long configId = appParam.getConfigId();
+    // an original edit
+    if (configId != null) {
+      ApplicationConfig config = this.getById(configId);
+      String decode = new String(Base64.getDecoder().decode(appParam.getConfig()));
+      String encode = DeflaterUtils.zipString(decode.trim());
+      // create...
+      if (!config.getContent().equals(encode)) {
+        if (latestConfig != null) {
+          removeById(latestConfig.getId());
         }
+        this.create(appParam, latest);
       } else {
-        // there was no configuration before, is a new configuration
-        if (effectiveConfig == null) {
-          if (latestConfig != null) {
-            removeById(latestConfig.getId());
-          }
-          this.create(appParam, latest);
-        } else {
-          String decode = new String(Base64.getDecoder().decode(appParam.getConfig()));
-          String encode = DeflaterUtils.zipString(decode.trim());
-          // need to diff the two configs are consistent
-          if (!effectiveConfig.getContent().equals(encode)) {
-            if (latestConfig != null) {
-              removeById(latestConfig.getId());
-            }
-            this.create(appParam, latest);
-          }
-        }
+        this.setLatestOrEffective(latest, configId, appParam.getId());
       }
     } else {
-      // may be re-selected a config file (without config id), or may be based on an original edit
-      // (with config Id).
-      Long configId = appParam.getConfigId();
-      // an original edit
-      if (configId != null) {
-        ApplicationConfig config = this.getById(configId);
+      ApplicationConfig config = getEffective(appParam.getId());
+      if (config != null) {
         String decode = new String(Base64.getDecoder().decode(appParam.getConfig()));
         String encode = DeflaterUtils.zipString(decode.trim());
         // create...
         if (!config.getContent().equals(encode)) {
+          this.create(appParam, latest);
+        }
+      } else {
+        this.create(appParam, latest);
+      }
+    }
+  }
+
+  private void updateForFlinkSqlJob(
+      Application appParam, Boolean latest, ApplicationConfig latestConfig) {
+    // get effect config
+    ApplicationConfig effectiveConfig = getEffective(appParam.getId());
+    if (Utils.isEmpty(appParam.getConfig())) {
+      if (effectiveConfig != null) {
+        effectiveService.remove(appParam.getId(), EffectiveTypeEnum.CONFIG);
+      }
+    } else {
+      // there was no configuration before, is a new configuration
+      if (effectiveConfig == null) {
+        if (latestConfig != null) {
+          removeById(latestConfig.getId());
+        }
+        this.create(appParam, latest);
+      } else {
+        String decode = new String(Base64.getDecoder().decode(appParam.getConfig()));
+        String encode = DeflaterUtils.zipString(decode.trim());
+        // need to diff the two configs are consistent
+        if (!effectiveConfig.getContent().equals(encode)) {
           if (latestConfig != null) {
             removeById(latestConfig.getId());
           }
           this.create(appParam, latest);
-        } else {
-          this.setLatestOrEffective(latest, configId, appParam.getId());
-        }
-      } else {
-        ApplicationConfig config = getEffective(appParam.getId());
-        if (config != null) {
-          String decode = new String(Base64.getDecoder().decode(appParam.getConfig()));
-          String encode = DeflaterUtils.zipString(decode.trim());
-          // create...
-          if (!config.getContent().equals(encode)) {
-            this.create(appParam, latest);
-          }
-        } else {
-          this.create(appParam, latest);
         }
       }
     }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 6587436..ea80ed2 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -215,24 +215,10 @@
     flinkCluster.setAlertId(paramOfCluster.getAlertId());
     flinkCluster.setDescription(paramOfCluster.getDescription());
     if (FlinkExecutionMode.isRemoteMode(flinkCluster.getFlinkExecutionModeEnum())) {
-      flinkCluster.setAddress(paramOfCluster.getAddress());
-      flinkCluster.setClusterState(ClusterState.RUNNING.getState());
-      flinkCluster.setStartTime(new Date());
-      flinkCluster.setEndTime(null);
+      updateFlinkClusterForRemoteMode(paramOfCluster, flinkCluster);
       FlinkClusterWatcher.addWatching(flinkCluster);
     } else {
-      flinkCluster.setClusterId(paramOfCluster.getClusterId());
-      flinkCluster.setVersionId(paramOfCluster.getVersionId());
-      flinkCluster.setDynamicProperties(paramOfCluster.getDynamicProperties());
-      flinkCluster.setOptions(paramOfCluster.getOptions());
-      flinkCluster.setResolveOrder(paramOfCluster.getResolveOrder());
-      flinkCluster.setK8sHadoopIntegration(paramOfCluster.getK8sHadoopIntegration());
-      flinkCluster.setK8sConf(paramOfCluster.getK8sConf());
-      flinkCluster.setK8sNamespace(paramOfCluster.getK8sNamespace());
-      flinkCluster.setK8sRestExposedType(paramOfCluster.getK8sRestExposedType());
-      flinkCluster.setServiceAccount(paramOfCluster.getServiceAccount());
-      flinkCluster.setFlinkImage(paramOfCluster.getFlinkImage());
-      flinkCluster.setYarnQueue(paramOfCluster.getYarnQueue());
+      updateFlinkClusterForNonRemoteModes(paramOfCluster, flinkCluster);
     }
     if (shouldWatchForK8s(flinkCluster)) {
       flinkK8sObserver.watchFlinkCluster(flinkCluster);
@@ -240,6 +226,30 @@
     updateById(flinkCluster);
   }
 
+  private void updateFlinkClusterForNonRemoteModes(
+      FlinkCluster paramOfCluster, FlinkCluster flinkCluster) {
+    flinkCluster.setClusterId(paramOfCluster.getClusterId());
+    flinkCluster.setVersionId(paramOfCluster.getVersionId());
+    flinkCluster.setDynamicProperties(paramOfCluster.getDynamicProperties());
+    flinkCluster.setOptions(paramOfCluster.getOptions());
+    flinkCluster.setResolveOrder(paramOfCluster.getResolveOrder());
+    flinkCluster.setK8sHadoopIntegration(paramOfCluster.getK8sHadoopIntegration());
+    flinkCluster.setK8sConf(paramOfCluster.getK8sConf());
+    flinkCluster.setK8sNamespace(paramOfCluster.getK8sNamespace());
+    flinkCluster.setK8sRestExposedType(paramOfCluster.getK8sRestExposedType());
+    flinkCluster.setServiceAccount(paramOfCluster.getServiceAccount());
+    flinkCluster.setFlinkImage(paramOfCluster.getFlinkImage());
+    flinkCluster.setYarnQueue(paramOfCluster.getYarnQueue());
+  }
+
+  private void updateFlinkClusterForRemoteMode(
+      FlinkCluster paramOfCluster, FlinkCluster flinkCluster) {
+    flinkCluster.setAddress(paramOfCluster.getAddress());
+    flinkCluster.setClusterState(ClusterState.RUNNING.getState());
+    flinkCluster.setStartTime(new Date());
+    flinkCluster.setEndTime(null);
+  }
+
   @Override
   public void shutdown(FlinkCluster cluster) {
     FlinkCluster flinkCluster = this.getById(cluster.getId());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 092a70d..cdb1859 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -116,15 +116,7 @@
     ApiAlertException.throwIfFalse(
         !project.getBuildState().equals(BuildStateEnum.BUILDING.get()),
         "The project is being built, update project failed.");
-    project.setName(projectParam.getName());
-    project.setUrl(projectParam.getUrl());
-    project.setBranches(projectParam.getBranches());
-    project.setPrvkeyPath(projectParam.getPrvkeyPath());
-    project.setUserName(projectParam.getUserName());
-    project.setPassword(projectParam.getPassword());
-    project.setPom(projectParam.getPom());
-    project.setDescription(projectParam.getDescription());
-    project.setBuildArgs(projectParam.getBuildArgs());
+    updateInternal(projectParam, project);
     if (project.isSshRepositoryUrl()) {
       project.setUserName(null);
     } else {
@@ -148,6 +140,18 @@
     return true;
   }
 
+  private static void updateInternal(Project projectParam, Project project) {
+    project.setName(projectParam.getName());
+    project.setUrl(projectParam.getUrl());
+    project.setBranches(projectParam.getBranches());
+    project.setPrvkeyPath(projectParam.getPrvkeyPath());
+    project.setUserName(projectParam.getUserName());
+    project.setPassword(projectParam.getPassword());
+    project.setPom(projectParam.getPom());
+    project.setDescription(projectParam.getDescription());
+    project.setBuildArgs(projectParam.getBuildArgs());
+  }
+
   @Override
   public boolean removeById(Long id) {
     Project project = getById(id);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index 29e6e77..f2f2ef3 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -129,39 +129,18 @@
     Dependency dependency = Dependency.toDependency(resourceStr);
     List<String> jars = dependency.getJar();
     List<MavenPom> poms = dependency.getPom();
-
-    ApiAlertException.throwIfTrue(
-        jars.isEmpty() && poms.isEmpty(), "Please add pom or jar resource.");
-
-    ApiAlertException.throwIfTrue(
-        resource.getResourceType() == ResourceTypeEnum.FLINK_APP && jars.isEmpty(),
-        "Please upload jar for Flink_App resource");
-
-    ApiAlertException.throwIfTrue(
-        jars.size() + poms.size() > 1, "Please do not add multi dependency at one time.");
+    check(resource, jars, poms);
 
     if (resource.getResourceType() == ResourceTypeEnum.CONNECTOR) {
-      String connector = resource.getConnector();
-      ApiAlertException.throwIfTrue(connector == null, "the flink connector is null.");
-      FlinkConnector connectorResource = JacksonUtils.read(connector, FlinkConnector.class);
-      resource.setResourceName(connectorResource.getFactoryIdentifier());
-      Optional.ofNullable(connectorResource.getRequiredOptions())
-          .ifPresent(
-              v ->
-                  resource.setConnectorRequiredOptions(
-                      ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write)));
-      Optional.ofNullable(connectorResource.getOptionalOptions())
-          .ifPresent(
-              v ->
-                  resource.setConnectorOptionalOptions(
-                      ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write)));
+      processConnectorResource(resource);
     } else {
       ApiAlertException.throwIfNull(resource.getResourceName(), "The resourceName is required.");
     }
 
-    ApiAlertException.throwIfTrue(
-        this.findByResourceName(resource.getTeamId(), resource.getResourceName()) != null,
-        String.format("the resource %s already exists, please check.", resource.getResourceName()));
+    ApiAlertException.throwIfNotNull(
+        this.findByResourceName(resource.getTeamId(), resource.getResourceName()),
+        "the resource %s already exists, please check.",
+        resource.getResourceName());
 
     if (!jars.isEmpty()) {
       String resourcePath = jars.get(0);
@@ -175,6 +154,33 @@
     this.save(resource);
   }
 
+  private static void processConnectorResource(Resource resource) throws JsonProcessingException {
+    String connector = resource.getConnector();
+    ApiAlertException.throwIfNull(connector, "the flink connector is null.");
+    FlinkConnector connectorResource = JacksonUtils.read(connector, FlinkConnector.class);
+    resource.setResourceName(connectorResource.getFactoryIdentifier());
+    Optional.ofNullable(connectorResource.getRequiredOptions())
+        .ifPresent(
+            v ->
+                resource.setConnectorRequiredOptions(
+                    ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write)));
+    Optional.ofNullable(connectorResource.getOptionalOptions())
+        .ifPresent(
+            v ->
+                resource.setConnectorOptionalOptions(
+                    ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write)));
+  }
+
+  private void check(Resource resource, List<String> jars, List<MavenPom> poms) {
+    ApiAlertException.throwIfTrue(
+        jars.isEmpty() && poms.isEmpty(), "Please add pom or jar resource.");
+    ApiAlertException.throwIfTrue(
+        resource.getResourceType() == ResourceTypeEnum.FLINK_APP && jars.isEmpty(),
+        "Please upload jar for Flink_App resource");
+    ApiAlertException.throwIfTrue(
+        jars.size() + poms.size() > 1, "Please do not add multi dependency at one time.");
+  }
+
   @Override
   public Resource findByResourceName(Long teamId, String name) {
     LambdaQueryWrapper<Resource> queryWrapper =
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 7b303f0..2a570ba 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -58,6 +58,7 @@
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.annotations.VisibleForTesting;
 import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
@@ -161,15 +162,7 @@
   public void trigger(Long appId, @Nullable String savepointPath, @Nullable Boolean nativeFormat) {
     log.info("Start to trigger savepoint for app {}", appId);
     Application application = applicationManageService.getById(appId);
-
-    ApplicationLog applicationLog = new ApplicationLog();
-    applicationLog.setOptionName(OperationEnum.SAVEPOINT.getValue());
-    applicationLog.setAppId(application.getId());
-    applicationLog.setJobManagerUrl(application.getJobManagerUrl());
-    applicationLog.setOptionTime(new Date());
-    applicationLog.setYarnAppId(application.getClusterId());
-    applicationLog.setUserId(commonService.getUserId());
-
+    ApplicationLog applicationLog = getApplicationLog(application);
     FlinkAppHttpWatcher.addSavepoint(application.getId());
 
     application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue());
@@ -189,6 +182,18 @@
     handleSavepointResponseFuture(application, applicationLog, savepointFuture);
   }
 
+  @NotNull
+  private ApplicationLog getApplicationLog(Application application) {
+    ApplicationLog applicationLog = new ApplicationLog();
+    applicationLog.setOptionName(OperationEnum.SAVEPOINT.getValue());
+    applicationLog.setAppId(application.getId());
+    applicationLog.setJobManagerUrl(application.getJobManagerUrl());
+    applicationLog.setOptionTime(new Date());
+    applicationLog.setYarnAppId(application.getClusterId());
+    applicationLog.setUserId(commonService.getUserId());
+    return applicationLog;
+  }
+
   @Override
   public Boolean remove(Long id, Application appParam) throws InternalException {
     SavePoint savePoint = getById(id);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java
index f891c44..291f1c7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java
@@ -98,13 +98,24 @@
     String host = remoteURI.getHost();
     String port = String.valueOf(remoteURI.getPort());
     String clusterId = flinkCluster.getClusterId();
-
     FlinkExecutionMode executionModeEnum = FlinkExecutionMode.of(flinkCluster.getExecutionMode());
-    if (executionModeEnum == null) {
-      throw new IllegalArgumentException("executionMode is null");
-    }
 
     streamParkConf.put("execution.target", executionModeEnum.getName());
+    renderConfByFlinkExecutionMode(
+        executionModeEnum, streamParkConf, host, port, clusterId, flinkCluster);
+
+    return sqlGateWayService.openSession(
+        new SessionEnvironment(
+            flinkGatewayId + flinkClusterId + UUID.randomUUID().toString(), null, streamParkConf));
+  }
+
+  private void renderConfByFlinkExecutionMode(
+      FlinkExecutionMode executionModeEnum,
+      Map<String, String> streamParkConf,
+      String host,
+      String port,
+      String clusterId,
+      FlinkCluster flinkCluster) {
     switch (Objects.requireNonNull(executionModeEnum)) {
       case REMOTE:
         streamParkConf.put("rest.address", host);
@@ -135,10 +146,6 @@
       default:
         throw new IllegalArgumentException("Unsupported execution mode: " + executionModeEnum);
     }
-
-    return sqlGateWayService.openSession(
-        new SessionEnvironment(
-            flinkGatewayId + flinkClusterId + UUID.randomUUID().toString(), null, streamParkConf));
   }
 
   @Override
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index b4bf9c8..2073d3b 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -238,10 +238,47 @@
    */
   private void getStateFromFlink(Application application) throws Exception {
     JobsOverview jobsOverview = httpJobsOverview(application);
+    Optional<JobsOverview.Job> jobOptional = getJobsOverviewJob(application, jobsOverview);
+    if (jobOptional.isPresent()) {
+
+      processJobState(application, jobOptional);
+    }
+  }
+
+  private void processJobState(Application application, Optional<JobsOverview.Job> jobOptional)
+      throws Exception {
+    JobsOverview.Job jobOverview = jobOptional.get();
+    FlinkAppStateEnum currentState = FlinkAppStateEnum.of(jobOverview.getState());
+
+    if (FlinkAppStateEnum.OTHER != currentState) {
+      try {
+        // 1) set info from JobOverview
+        handleJobOverview(application, jobOverview);
+      } catch (Exception e) {
+        log.error("get flink jobOverview error: {}", e.getMessage(), e);
+      }
+      try {
+        // 2) CheckPoints
+        handleCheckPoints(application);
+      } catch (Exception e) {
+        log.error("get flink jobOverview error: {}", e.getMessage(), e);
+      }
+      // 3) savePoint obsolete check and NEED_START check
+      OptionStateEnum optionStateEnum = OPTIONING.get(application.getId());
+      if (FlinkAppStateEnum.RUNNING == currentState) {
+        handleRunningState(application, optionStateEnum, currentState);
+      } else {
+        handleNotRunState(application, optionStateEnum, currentState);
+      }
+    }
+  }
+
+  @Nonnull
+  private Optional<JobsOverview.Job> getJobsOverviewJob(
+      Application application, JobsOverview jobsOverview) {
     Optional<JobsOverview.Job> optional;
     FlinkExecutionMode execMode = application.getFlinkExecutionMode();
-    if (FlinkExecutionMode.YARN_APPLICATION == execMode
-        || FlinkExecutionMode.YARN_PER_JOB == execMode) {
+    if (FlinkExecutionMode.isYarnPerJobOrAppMode(execMode)) {
       optional =
           !jobsOverview.getJobs().isEmpty()
               ? jobsOverview.getJobs().stream()
@@ -254,33 +291,7 @@
               .filter(x -> x.getId().equals(application.getJobId()))
               .findFirst();
     }
-    if (optional.isPresent()) {
-
-      JobsOverview.Job jobOverview = optional.get();
-      FlinkAppStateEnum currentState = FlinkAppStateEnum.of(jobOverview.getState());
-
-      if (FlinkAppStateEnum.OTHER != currentState) {
-        try {
-          // 1) set info from JobOverview
-          handleJobOverview(application, jobOverview);
-        } catch (Exception e) {
-          log.error("get flink jobOverview error: {}", e.getMessage(), e);
-        }
-        try {
-          // 2) CheckPoints
-          handleCheckPoints(application);
-        } catch (Exception e) {
-          log.error("get flink jobOverview error: {}", e.getMessage(), e);
-        }
-        // 3) savePoint obsolete check and NEED_START check
-        OptionStateEnum optionStateEnum = OPTIONING.get(application.getId());
-        if (FlinkAppStateEnum.RUNNING == currentState) {
-          handleRunningState(application, optionStateEnum, currentState);
-        } else {
-          handleNotRunState(application, optionStateEnum, currentState);
-        }
-      }
-    }
+    return optional;
   }
 
   /**
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/SsoShiroPlugin.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/SsoShiroPlugin.java
index 326f1a6..94ab68c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/SsoShiroPlugin.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/SsoShiroPlugin.java
@@ -60,6 +60,20 @@
     shiroService.addRealm(new Pac4jRealm());
 
     // Construct the shiro filter for SSO
+    constructShiroFilterForSSO();
+
+    // Construct the filterChainDefinitionMap for SSO
+    LinkedHashMap<String, String> filterChainDefinitionMap = new LinkedHashMap<>();
+    filterChainDefinitionMap.put("/sso/signin", "ssoSecurityFilter");
+    filterChainDefinitionMap.put("/sso/token", "ssoSecurityFilter");
+    filterChainDefinitionMap.put("/pac4jLogout", "ssoLogoutFilter");
+    // Get callback endpoint from callbackUrl
+    String callbackEndpoint = URI.create(ssoConfig.getClients().getCallbackUrl()).getPath();
+    filterChainDefinitionMap.put(callbackEndpoint, "ssoCallbackFilter");
+    shiroService.addFilterChains(filterChainDefinitionMap);
+  }
+
+  private void constructShiroFilterForSSO() {
     SecurityFilter securityFilter = new SecurityFilter();
     CallbackFilter callbackFilter = new CallbackFilter();
     LogoutFilter logoutFilter = new LogoutFilter();
@@ -72,15 +86,5 @@
     filters.put("ssoCallbackFilter", callbackFilter);
     filters.put("ssoLogoutFilter", logoutFilter);
     shiroService.addFilters(filters);
-
-    // Construct the filterChainDefinitionMap for SSO
-    LinkedHashMap<String, String> filterChainDefinitionMap = new LinkedHashMap<>();
-    filterChainDefinitionMap.put("/sso/signin", "ssoSecurityFilter");
-    filterChainDefinitionMap.put("/sso/token", "ssoSecurityFilter");
-    filterChainDefinitionMap.put("/pac4jLogout", "ssoLogoutFilter");
-    // Get callback endpoint from callbackUrl
-    String callbackEndpoint = URI.create(ssoConfig.getClients().getCallbackUrl()).getPath();
-    filterChainDefinitionMap.put(callbackEndpoint, "ssoCallbackFilter");
-    shiroService.addFilterChains(filterChainDefinitionMap);
   }
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java
index 44e1eb9..2269b9e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java
@@ -64,7 +64,8 @@
 
     ApiAlertException.throwIfTrue(
         user.getLoginType() != LoginTypeEnum.PASSWORD,
-        String.format("user [%s] can not login with PASSWORD", username));
+        "user [%s] can not login with PASSWORD",
+        username);
 
     String salt = user.getSalt();
     password = ShaHashUtils.encrypt(salt, password);
@@ -86,7 +87,9 @@
     if (user != null) {
       ApiAlertException.throwIfTrue(
           user.getLoginType() != LoginTypeEnum.LDAP,
-          String.format("user [%s] can only sign in with %s", username, user.getLoginType()));
+          "user [%s] can only sign in with %s",
+          username,
+          user.getLoginType());
 
       return user;
     }
@@ -100,7 +103,9 @@
     if (user != null) {
       ApiAlertException.throwIfTrue(
           user.getLoginType() != LoginTypeEnum.SSO,
-          String.format("user [%s] can only sign in with %s", username, user.getLoginType()));
+          "user [%s] can only sign in with %s",
+          username,
+          user.getLoginType());
       return user;
     }
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/LdapService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/LdapService.java
index e8215a1..45a2a73 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/LdapService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/LdapService.java
@@ -73,10 +73,47 @@
    * @return boolean ldapLoginStatus
    */
   public boolean ldapLogin(String userId, String userPwd) {
-
     ApiAlertException.throwIfFalse(
         enable, "ldap is not enabled, Please check the configuration: ldap.enable");
+    renderLdapEnv();
+    try {
+      NamingEnumeration<SearchResult> results = getSearchResults(userId);
+      if (!results.hasMore()) {
+        return false;
+      }
+      SearchResult result = results.next();
+      NamingEnumeration<? extends Attribute> attrs = result.getAttributes().getAll();
+      while (attrs.hasMore()) {
+        ldapEnv.put(Context.SECURITY_PRINCIPAL, result.getNameInNamespace());
+        ldapEnv.put(Context.SECURITY_CREDENTIALS, userPwd);
+        try {
+          new InitialDirContext(ldapEnv);
+        } catch (Exception e) {
+          log.warn("invalid ldap credentials or ldap search error", e);
+          return false;
+        }
+        Attribute attr = attrs.next();
+        if (attr.getID().equals(ldapUserIdentifyingAttribute)) {
+          return true;
+        }
+      }
+    } catch (NamingException e) {
+      log.error("ldap search error", e);
+    }
+    return false;
+  }
 
+  private NamingEnumeration<SearchResult> getSearchResults(String userId) throws NamingException {
+    LdapContext ctx = new InitialLdapContext(ldapEnv, null);
+    SearchControls sc = new SearchControls();
+    sc.setReturningAttributes(new String[] {ldapUserIdentifyingAttribute});
+    sc.setSearchScope(SearchControls.SUBTREE_SCOPE);
+    EqualsFilter filter = new EqualsFilter(ldapUserIdentifyingAttribute, userId);
+    NamingEnumeration<SearchResult> results = ctx.search(ldapBaseDn, filter.toString(), sc);
+    return results;
+  }
+
+  private void renderLdapEnv() {
     if (ldapEnv == null) {
       ldapEnv = new Properties();
       ldapEnv.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
@@ -86,36 +123,5 @@
 
     ldapEnv.put(Context.SECURITY_PRINCIPAL, ldapSecurityPrincipal);
     ldapEnv.put(Context.SECURITY_CREDENTIALS, ldapPrincipalPassword);
-
-    try {
-      LdapContext ctx = new InitialLdapContext(ldapEnv, null);
-      SearchControls sc = new SearchControls();
-      sc.setReturningAttributes(new String[] {ldapUserIdentifyingAttribute});
-      sc.setSearchScope(SearchControls.SUBTREE_SCOPE);
-      EqualsFilter filter = new EqualsFilter(ldapUserIdentifyingAttribute, userId);
-      NamingEnumeration<SearchResult> results = ctx.search(ldapBaseDn, filter.toString(), sc);
-      if (results.hasMore()) {
-        SearchResult result = results.next();
-        NamingEnumeration<? extends Attribute> attrs = result.getAttributes().getAll();
-        while (attrs.hasMore()) {
-          ldapEnv.put(Context.SECURITY_PRINCIPAL, result.getNameInNamespace());
-          ldapEnv.put(Context.SECURITY_CREDENTIALS, userPwd);
-          try {
-            new InitialDirContext(ldapEnv);
-          } catch (Exception e) {
-            log.warn("invalid ldap credentials or ldap search error", e);
-            return false;
-          }
-          Attribute attr = attrs.next();
-          if (attr.getID().equals(ldapUserIdentifyingAttribute)) {
-            return true;
-          }
-        }
-      }
-    } catch (NamingException e) {
-      log.error("ldap search error", e);
-      return false;
-    }
-    return false;
   }
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
index 90f7975..e2154d0 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
@@ -42,7 +42,6 @@
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 @Service
@@ -119,28 +118,18 @@
 
   @Override
   public void createMember(Member member) {
-    User user =
-        Optional.ofNullable(userService.getByUsername(member.getUserName()))
-            .orElseThrow(
-                () ->
-                    new ApiAlertException(
-                        String.format("The username [%s] not found", member.getUserName())));
-    Optional.ofNullable(roleService.getById(member.getRoleId()))
-        .orElseThrow(
-            () ->
-                new ApiAlertException(
-                    String.format("The roleId [%s] not found", member.getRoleId())));
-    Team team =
-        Optional.ofNullable(teamService.getById(member.getTeamId()))
-            .orElseThrow(
-                () ->
-                    new ApiAlertException(
-                        String.format("The teamId [%s] not found", member.getTeamId())));
-    ApiAlertException.throwIfFalse(
-        findByUserId(member.getTeamId(), user.getUserId()) == null,
-        String.format(
-            "The user [%s] has been added the team [%s], please don't add it again.",
-            member.getUserName(), team.getTeamName()));
+    User user = userService.getByUsername(member.getUserName());
+    ApiAlertException.throwIfNull(user, "The username [%s] not found", member.getUserName());
+
+    ApiAlertException.throwIfNull(
+        roleService.getById(member.getRoleId()), "The roleId [%s] not found", member.getRoleId());
+    Team team = teamService.getById(member.getTeamId());
+    ApiAlertException.throwIfNull(team, "The teamId [%s] not found", member.getTeamId());
+    ApiAlertException.throwIfNotNull(
+        findByUserId(member.getTeamId(), user.getUserId()),
+        "The user [%s] has been added the team [%s], please don't add it again.",
+        member.getUserName(),
+        team.getTeamName());
 
     member.setId(null);
     member.setUserId(user.getUserId());
@@ -153,29 +142,20 @@
 
   @Override
   public void remove(Long id) {
-    Member member =
-        Optional.ofNullable(this.getById(id))
-            .orElseThrow(
-                () -> new ApiAlertException(String.format("The member [id=%s] not found", id)));
+    Member member = this.getById(id);
+    ApiAlertException.throwIfNull(member, "The member [id=%s] not found", id);
     this.removeById(member);
     userService.clearLastTeam(member.getUserId(), member.getTeamId());
   }
 
   @Override
   public void updateMember(Member member) {
-    Member oldMember =
-        Optional.ofNullable(this.getById(member.getId()))
-            .orElseThrow(
-                () ->
-                    new ApiAlertException(
-                        String.format("The member [id=%s] not found", member.getId())));
+    Member oldMember = this.getById(member.getId());
+    ApiAlertException.throwIfNull(oldMember, "The member [id=%s] not found", member.getId());
     Utils.required(oldMember.getTeamId().equals(member.getTeamId()), "Team id cannot be changed.");
     Utils.required(oldMember.getUserId().equals(member.getUserId()), "User id cannot be changed.");
-    Optional.ofNullable(roleService.getById(member.getRoleId()))
-        .orElseThrow(
-            () ->
-                new ApiAlertException(
-                    String.format("The roleId [%s] not found", member.getRoleId())));
+    ApiAlertException.throwIfNull(
+        roleService.getById(member.getRoleId()), "The roleId [%s] not found", member.getRoleId());
     oldMember.setRoleId(member.getRoleId());
     updateById(oldMember);
   }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
index d7079a3..fea8bb8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
@@ -96,22 +96,22 @@
     log.info("{} Proceed delete team[Id={}]", commonService.getCurrentUser().getUsername(), teamId);
     Team team = this.getById(teamId);
 
-    ApiAlertException.throwIfNull(team, String.format("The team[Id=%s] doesn't exist.", teamId));
+    ApiAlertException.throwIfNull(team, "The team[Id=%s] doesn't exist.", teamId);
 
     ApiAlertException.throwIfTrue(
         applicationInfoService.existsByTeamId(teamId),
-        String.format(
-            "Please delete the applications under the team[name=%s] first!", team.getTeamName()));
+        "Please delete the applications under the team[name=%s] first!",
+        team.getTeamName());
 
     ApiAlertException.throwIfTrue(
         projectService.existsByTeamId(teamId),
-        String.format(
-            "Please delete the projects under the team[name=%s] first!", team.getTeamName()));
+        "Please delete the projects under the team[name=%s] first!",
+        team.getTeamName());
 
     ApiAlertException.throwIfTrue(
         variableService.existsByTeamId(teamId),
-        String.format(
-            "Please delete the variables under the team[name=%s] first!", team.getTeamName()));
+        "Please delete the variables under the team[name=%s] first!",
+        team.getTeamName());
 
     memberService.removeByTeamId(teamId);
     userService.clearLastTeam(teamId);