[improve] improve resource management logic (#3439)

Co-authored-by: monrg <wangmonrg@gmail.com>
diff --git a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
index 40deb57..22d93e5 100644
--- a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
+++ b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
@@ -48,4 +48,18 @@
       return e.getClass().getName() + " (error while printing stack trace)";
     }
   }
+
+  @FunctionalInterface
+  public interface WrapperRuntimeExceptionHandler<I, O> {
+    O handle(I input) throws Exception;
+  }
+
+  public static <I, O> O wrapRuntimeException(
+      I input, WrapperRuntimeExceptionHandler<I, O> handler) {
+    try {
+      return handler.handle(input);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index f8e3a21..fcba212 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -96,6 +96,14 @@
     new JarInputStream(new BufferedInputStream(new FileInputStream(jarFile))).getManifest
   }
 
+  def getJarManClass(jarFile: File): String = {
+    val manifest = getJarManifest(jarFile)
+    manifest.getMainAttributes.getValue("Main-Class") match {
+      case null => manifest.getMainAttributes.getValue("program-class")
+      case v => v
+    }
+  }
+
   def copyProperties(original: Properties, target: Properties): Unit =
     original.foreach(x => target.put(x._1, x._2))
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 95883a0..36a1d05 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -78,7 +78,6 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.jar.Manifest;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -498,8 +497,7 @@
           project.getDistHome().getAbsolutePath().concat("/").concat(appParam.getModule());
       jarFile = new File(modulePath, appParam.getJar());
     }
-    Manifest manifest = Utils.getJarManifest(jarFile);
-    return manifest.getMainAttributes().getValue("Main-Class");
+    return Utils.getJarManClass(jarFile);
   }
 
   @Override
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index b89e57a..b8df85f 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -45,10 +45,10 @@
 import org.apache.streampark.flink.packer.maven.MavenTool;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.table.factories.Factory;
-import org.apache.hadoop.shaded.org.apache.commons.codec.digest.DigestUtils;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
@@ -56,6 +56,7 @@
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -68,7 +69,6 @@
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Serializable;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.time.Duration;
@@ -77,12 +77,12 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Scanner;
 import java.util.ServiceLoader;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
-import java.util.jar.Manifest;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -95,7 +95,9 @@
   public static final String EXCEPTION = "exception";
 
   @Autowired private ApplicationManageService applicationManageService;
+
   @Autowired private CommonService commonService;
+
   @Autowired private FlinkSqlService flinkSqlService;
 
   @Override
@@ -138,26 +140,28 @@
     ApiAlertException.throwIfTrue(
         jars.size() + poms.size() > 1, "Please do not add multi dependency at one time.");
 
-    if (resource.getResourceType() != ResourceTypeEnum.CONNECTOR) {
-      ApiAlertException.throwIfNull(resource.getResourceName(), "The resourceName is required.");
-    } else {
+    if (resource.getResourceType() == ResourceTypeEnum.CONNECTOR) {
       String connector = resource.getConnector();
       ApiAlertException.throwIfTrue(connector == null, "the flink connector is null.");
       FlinkConnector connectorResource = JacksonUtils.read(connector, FlinkConnector.class);
       resource.setResourceName(connectorResource.getFactoryIdentifier());
-      if (connectorResource.getRequiredOptions() != null) {
-        resource.setConnectorRequiredOptions(
-            JacksonUtils.write(connectorResource.getRequiredOptions()));
-      }
-      if (connectorResource.getOptionalOptions() != null) {
-        resource.setConnectorOptionalOptions(
-            JacksonUtils.write(connectorResource.getOptionalOptions()));
-      }
+      Optional.ofNullable(connectorResource.getRequiredOptions())
+          .ifPresent(
+              v ->
+                  resource.setConnectorRequiredOptions(
+                      ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write)));
+      Optional.ofNullable(connectorResource.getOptionalOptions())
+          .ifPresent(
+              v ->
+                  resource.setConnectorOptionalOptions(
+                      ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write)));
+    } else {
+      ApiAlertException.throwIfNull(resource.getResourceName(), "The resourceName is required.");
     }
 
     ApiAlertException.throwIfTrue(
         this.findByResourceName(resource.getTeamId(), resource.getResourceName()) != null,
-        String.format("Sorry, the resource %s already exists.", resource.getResourceName()));
+        String.format("the resource %s already exists, please check.", resource.getResourceName()));
 
     if (!jars.isEmpty()) {
       String resourcePath = jars.get(0);
@@ -251,15 +255,8 @@
   @Override
   public String upload(MultipartFile file) throws IOException {
     File temp = WebUtils.getAppTempDir();
-
-    String name = file.getOriginalFilename();
-    String suffix = name.substring(name.lastIndexOf("."));
-
-    String sha256Hex = DigestUtils.sha256Hex(file.getInputStream());
-    String fileName = sha256Hex.concat(suffix);
-
+    String fileName = FilenameUtils.getName(Objects.requireNonNull(file.getOriginalFilename()));
     File saveFile = new File(temp, fileName);
-
     if (!saveFile.exists()) {
       // save file to temp dir
       try {
@@ -268,114 +265,93 @@
         throw new ApiDetailException(e);
       }
     }
-
     return saveFile.getAbsolutePath();
   }
 
   @Override
   public RestResponse checkResource(Resource resourceParam) throws JsonProcessingException {
     ResourceTypeEnum type = resourceParam.getResourceType();
-    Map<String, Serializable> resp = new HashMap<>(0);
-    resp.put(STATE, 0);
     switch (type) {
       case FLINK_APP:
-        // check main.
-        File jarFile;
-        try {
-          jarFile = getResourceJar(resourceParam);
-        } catch (Exception e) {
-          // get jarFile error
-          resp.put(STATE, 1);
-          resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
-          return RestResponse.success().data(resp);
-        }
-        if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) {
-          return RestResponse.success().data(resp);
-        }
-        Manifest manifest = Utils.getJarManifest(jarFile);
-        String mainClass = manifest.getMainAttributes().getValue("Main-Class");
-
-        if (mainClass == null) {
-          // main class is null
-          resp.put(STATE, 2);
-          return RestResponse.success().data(resp);
-        }
-        return RestResponse.success().data(resp);
+        return checkFlinkApp(resourceParam);
       case CONNECTOR:
-        // 1) get connector id
-        FlinkConnector connectorResource;
+        return checkConnector(resourceParam);
+    }
+    return RestResponse.success().data(ImmutableMap.of(STATE, 0));
+  }
 
-        ApiAlertException.throwIfFalse(
-            ResourceTypeEnum.CONNECTOR == resourceParam.getResourceType(),
-            "getConnectorId method error, resource not flink connector.");
+  private RestResponse checkConnector(Resource resourceParam) throws JsonProcessingException {
+    // 1) get connector jar
+    FlinkConnector connectorResource;
+    List<File> jars;
+    File connector;
+    List<String> factories;
+    try {
+      File file = getResourceJar(resourceParam);
+      connector = file;
+      jars = Collections.singletonList(file);
+    } catch (Exception e) {
+      // get jarFile error
+      return buildExceptResponse(e, 1);
+    }
 
-        List<File> jars;
-        File connector = null;
-        List<String> factories;
+    // 2) parse connector Factory
+    try {
+      factories = getConnectorFactory(connector);
+    } catch (Exception e) {
+      // flink connector invalid
+      return buildExceptResponse(e, 2);
+    }
 
-        Dependency dependency = Dependency.toDependency(resourceParam.getResource());
+    // 3) get connector resource
+    connectorResource = getConnectorResource(jars, factories);
+    if (connectorResource == null) {
+      // connector is null
+      return buildExceptResponse(new RuntimeException("connector is null"), 3);
+    }
 
-        // 1) get connector jar
-        if (!dependency.getPom().isEmpty()) {
-          Artifact artifact = dependency.toArtifact().get(0);
-          try {
-            jars = MavenTool.resolveArtifacts(artifact);
-          } catch (Exception e) {
-            // connector download is null
-            resp.put(STATE, 1);
-            resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
-            return RestResponse.success().data(resp);
-          }
-          String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version());
-          Optional<File> file = jars.stream().filter(x -> x.getName().equals(fileName)).findFirst();
-          if (file.isPresent()) {
-            connector = file.get();
-          }
-        } else {
-          // 2) jar
-          String jar = dependency.getJar().get(0).split(":")[1];
-          File file = new File(jar);
-          connector = file;
-          jars = Collections.singletonList(file);
-        }
+    // 2) check connector exists
+    boolean exists =
+        existsFlinkConnector(resourceParam.getId(), connectorResource.getFactoryIdentifier());
+    if (exists) {
+      return buildExceptResponse(new RuntimeException("connector is already exists"), 4);
+    }
 
-        // 2) parse connector Factory
-        try {
-          factories = getConnectorFactory(connector);
-        } catch (Exception e) {
-          // flink connector invalid
-          resp.put(STATE, 2);
-          resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
-          return RestResponse.success().data(resp);
-        }
+    if (resourceParam.getId() != null
+        && !(getById(resourceParam.getId())
+            .getResourceName()
+            .equals(connectorResource.getFactoryIdentifier()))) {
+      return buildExceptResponse(
+          new RuntimeException("resource name different with FactoryIdentifier"), 5);
+    }
+    return RestResponse.success()
+        .data(ImmutableMap.of(STATE, 0, "connector", JacksonUtils.write(connectorResource)));
+  }
 
-        // 3) get connector resource
-        connectorResource = getConnectorResource(jars, factories);
-        if (connectorResource == null) {
-          // connector is null
-          resp.put(STATE, 3);
-          return RestResponse.success().data(resp);
-        }
+  private static RestResponse buildExceptResponse(Exception e, int code) {
+    return RestResponse.success()
+        .data(ImmutableMap.of(STATE, code, EXCEPTION, ExceptionUtils.stringifyException(e)));
+  }
 
-        // 2) check connector exists
-        boolean exists =
-            existsFlinkConnector(resourceParam.getId(), connectorResource.getFactoryIdentifier());
-        if (exists) {
-          resp.put(STATE, 4);
-          resp.put("name", connectorResource.getFactoryIdentifier());
-          return RestResponse.success(resp);
-        }
-
-        if (resourceParam.getId() != null) {
-          Resource resource = getById(resourceParam.getId());
-          if (!resource.getResourceName().equals(connectorResource.getFactoryIdentifier())) {
-            resp.put(STATE, 5);
-            return RestResponse.success().data(resp);
-          }
-        }
-        resp.put(STATE, 0);
-        resp.put("connector", JacksonUtils.write(connectorResource));
-        return RestResponse.success().data(resp);
+  private RestResponse checkFlinkApp(Resource resourceParam) {
+    // check main.
+    File jarFile;
+    try {
+      jarFile = getResourceJar(resourceParam);
+    } catch (Exception e) {
+      // get jarFile error
+      return buildExceptResponse(e, 1);
+    }
+    ApiAlertException.throwIfTrue(jarFile == null, "flink app jar must exist.");
+    Map<String, Serializable> resp = new HashMap<>(0);
+    resp.put(STATE, 0);
+    if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) {
+      return RestResponse.success().data(resp);
+    }
+    String mainClass = Utils.getJarManClass(jarFile);
+    if (mainClass == null) {
+      // main class is null
+      return buildExceptResponse(new RuntimeException("main class is null"), 2);
     }
     return RestResponse.success().data(resp);
   }
@@ -394,13 +370,7 @@
     URL[] array =
         jars.stream()
             .map(
-                x -> {
-                  try {
-                    return x.toURI().toURL();
-                  } catch (MalformedURLException e) {
-                    throw new RuntimeException(e);
-                  }
-                })
+                file -> ExceptionUtils.wrapRuntimeException(file, handle -> handle.toURI().toURL()))
             .toArray(URL[]::new);
 
     try (URLClassLoader urlClassLoader = URLClassLoader.newInstance(array)) {
@@ -458,9 +428,9 @@
         String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version());
         Optional<File> jarFile =
             files.stream().filter(x -> x.getName().equals(fileName)).findFirst();
-        if (jarFile.isPresent()) {
-          return jarFile.get();
-        }
+        jarFile.ifPresent(
+            file -> transferTeamResource(resource.getTeamId(), file.getAbsolutePath()));
+        return jarFile.orElse(null);
       }
       return null;
     }