[improve] improve resource management logic (#3439)
Co-authored-by: monrg <wangmonrg@gmail.com>
diff --git a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
index 40deb57..22d93e5 100644
--- a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
+++ b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
@@ -48,4 +48,18 @@
return e.getClass().getName() + " (error while printing stack trace)";
}
}
+
+ @FunctionalInterface
+ public interface WrapperRuntimeExceptionHandler<I, O> {
+ O handle(I input) throws Exception;
+ }
+
+ public static <I, O> O wrapRuntimeException(
+ I input, WrapperRuntimeExceptionHandler<I, O> handler) {
+ try {
+ return handler.handle(input);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index f8e3a21..fcba212 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -96,6 +96,14 @@
new JarInputStream(new BufferedInputStream(new FileInputStream(jarFile))).getManifest
}
+ def getJarManClass(jarFile: File): String = {
+ val manifest = getJarManifest(jarFile)
+ manifest.getMainAttributes.getValue("Main-Class") match {
+ case null => manifest.getMainAttributes.getValue("program-class")
+ case v => v
+ }
+ }
+
def copyProperties(original: Properties, target: Properties): Unit =
original.foreach(x => target.put(x._1, x._2))
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 95883a0..36a1d05 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -78,7 +78,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.jar.Manifest;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -498,8 +497,7 @@
project.getDistHome().getAbsolutePath().concat("/").concat(appParam.getModule());
jarFile = new File(modulePath, appParam.getJar());
}
- Manifest manifest = Utils.getJarManifest(jarFile);
- return manifest.getMainAttributes().getValue("Main-Class");
+ return Utils.getJarManClass(jarFile);
}
@Override
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index b89e57a..b8df85f 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -45,10 +45,10 @@
import org.apache.streampark.flink.packer.maven.MavenTool;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.factories.Factory;
-import org.apache.hadoop.shaded.org.apache.commons.codec.digest.DigestUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
@@ -56,6 +56,7 @@
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -68,7 +69,6 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
-import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.time.Duration;
@@ -77,12 +77,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Scanner;
import java.util.ServiceLoader;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
-import java.util.jar.Manifest;
import java.util.stream.Collectors;
@Slf4j
@@ -95,7 +95,9 @@
public static final String EXCEPTION = "exception";
@Autowired private ApplicationManageService applicationManageService;
+
@Autowired private CommonService commonService;
+
@Autowired private FlinkSqlService flinkSqlService;
@Override
@@ -138,26 +140,28 @@
ApiAlertException.throwIfTrue(
jars.size() + poms.size() > 1, "Please do not add multi dependency at one time.");
- if (resource.getResourceType() != ResourceTypeEnum.CONNECTOR) {
- ApiAlertException.throwIfNull(resource.getResourceName(), "The resourceName is required.");
- } else {
+ if (resource.getResourceType() == ResourceTypeEnum.CONNECTOR) {
String connector = resource.getConnector();
ApiAlertException.throwIfTrue(connector == null, "the flink connector is null.");
FlinkConnector connectorResource = JacksonUtils.read(connector, FlinkConnector.class);
resource.setResourceName(connectorResource.getFactoryIdentifier());
- if (connectorResource.getRequiredOptions() != null) {
- resource.setConnectorRequiredOptions(
- JacksonUtils.write(connectorResource.getRequiredOptions()));
- }
- if (connectorResource.getOptionalOptions() != null) {
- resource.setConnectorOptionalOptions(
- JacksonUtils.write(connectorResource.getOptionalOptions()));
- }
+ Optional.ofNullable(connectorResource.getRequiredOptions())
+ .ifPresent(
+ v ->
+ resource.setConnectorRequiredOptions(
+ ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write)));
+ Optional.ofNullable(connectorResource.getOptionalOptions())
+ .ifPresent(
+ v ->
+ resource.setConnectorOptionalOptions(
+ ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write)));
+ } else {
+ ApiAlertException.throwIfNull(resource.getResourceName(), "The resourceName is required.");
}
ApiAlertException.throwIfTrue(
this.findByResourceName(resource.getTeamId(), resource.getResourceName()) != null,
- String.format("Sorry, the resource %s already exists.", resource.getResourceName()));
+ String.format("the resource %s already exists, please check.", resource.getResourceName()));
if (!jars.isEmpty()) {
String resourcePath = jars.get(0);
@@ -251,15 +255,8 @@
@Override
public String upload(MultipartFile file) throws IOException {
File temp = WebUtils.getAppTempDir();
-
- String name = file.getOriginalFilename();
- String suffix = name.substring(name.lastIndexOf("."));
-
- String sha256Hex = DigestUtils.sha256Hex(file.getInputStream());
- String fileName = sha256Hex.concat(suffix);
-
+ String fileName = FilenameUtils.getName(Objects.requireNonNull(file.getOriginalFilename()));
File saveFile = new File(temp, fileName);
-
if (!saveFile.exists()) {
// save file to temp dir
try {
@@ -268,114 +265,93 @@
throw new ApiDetailException(e);
}
}
-
return saveFile.getAbsolutePath();
}
@Override
public RestResponse checkResource(Resource resourceParam) throws JsonProcessingException {
ResourceTypeEnum type = resourceParam.getResourceType();
- Map<String, Serializable> resp = new HashMap<>(0);
- resp.put(STATE, 0);
switch (type) {
case FLINK_APP:
- // check main.
- File jarFile;
- try {
- jarFile = getResourceJar(resourceParam);
- } catch (Exception e) {
- // get jarFile error
- resp.put(STATE, 1);
- resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
- return RestResponse.success().data(resp);
- }
- if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) {
- return RestResponse.success().data(resp);
- }
- Manifest manifest = Utils.getJarManifest(jarFile);
- String mainClass = manifest.getMainAttributes().getValue("Main-Class");
-
- if (mainClass == null) {
- // main class is null
- resp.put(STATE, 2);
- return RestResponse.success().data(resp);
- }
- return RestResponse.success().data(resp);
+ return checkFlinkApp(resourceParam);
case CONNECTOR:
- // 1) get connector id
- FlinkConnector connectorResource;
+ return checkConnector(resourceParam);
+ }
+ return RestResponse.success().data(ImmutableMap.of(STATE, 0));
+ }
- ApiAlertException.throwIfFalse(
- ResourceTypeEnum.CONNECTOR == resourceParam.getResourceType(),
- "getConnectorId method error, resource not flink connector.");
+ private RestResponse checkConnector(Resource resourceParam) throws JsonProcessingException {
+ // 1) get connector jar
+ FlinkConnector connectorResource;
+ List<File> jars;
+ File connector;
+ List<String> factories;
+ try {
+ File file = getResourceJar(resourceParam);
+ connector = file;
+ jars = Collections.singletonList(file);
+ } catch (Exception e) {
+ // get jarFile error
+ return buildExceptResponse(e, 1);
+ }
- List<File> jars;
- File connector = null;
- List<String> factories;
+ // 2) parse connector Factory
+ try {
+ factories = getConnectorFactory(connector);
+ } catch (Exception e) {
+ // flink connector invalid
+ return buildExceptResponse(e, 2);
+ }
- Dependency dependency = Dependency.toDependency(resourceParam.getResource());
+ // 3) get connector resource
+ connectorResource = getConnectorResource(jars, factories);
+ if (connectorResource == null) {
+ // connector is null
+ return buildExceptResponse(new RuntimeException("connector is null"), 3);
+ }
- // 1) get connector jar
- if (!dependency.getPom().isEmpty()) {
- Artifact artifact = dependency.toArtifact().get(0);
- try {
- jars = MavenTool.resolveArtifacts(artifact);
- } catch (Exception e) {
- // connector download is null
- resp.put(STATE, 1);
- resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
- return RestResponse.success().data(resp);
- }
- String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version());
- Optional<File> file = jars.stream().filter(x -> x.getName().equals(fileName)).findFirst();
- if (file.isPresent()) {
- connector = file.get();
- }
- } else {
- // 2) jar
- String jar = dependency.getJar().get(0).split(":")[1];
- File file = new File(jar);
- connector = file;
- jars = Collections.singletonList(file);
- }
+ // 2) check connector exists
+ boolean exists =
+ existsFlinkConnector(resourceParam.getId(), connectorResource.getFactoryIdentifier());
+ if (exists) {
+ return buildExceptResponse(new RuntimeException("connector is already exists"), 4);
+ }
- // 2) parse connector Factory
- try {
- factories = getConnectorFactory(connector);
- } catch (Exception e) {
- // flink connector invalid
- resp.put(STATE, 2);
- resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
- return RestResponse.success().data(resp);
- }
+ if (resourceParam.getId() != null
+ && !(getById(resourceParam.getId())
+ .getResourceName()
+ .equals(connectorResource.getFactoryIdentifier()))) {
+ return buildExceptResponse(
+ new RuntimeException("resource name different with FactoryIdentifier"), 5);
+ }
+ return RestResponse.success()
+ .data(ImmutableMap.of(STATE, 0, "connector", JacksonUtils.write(connectorResource)));
+ }
- // 3) get connector resource
- connectorResource = getConnectorResource(jars, factories);
- if (connectorResource == null) {
- // connector is null
- resp.put(STATE, 3);
- return RestResponse.success().data(resp);
- }
+ private static RestResponse buildExceptResponse(Exception e, int code) {
+ return RestResponse.success()
+ .data(ImmutableMap.of(STATE, code, EXCEPTION, ExceptionUtils.stringifyException(e)));
+ }
- // 2) check connector exists
- boolean exists =
- existsFlinkConnector(resourceParam.getId(), connectorResource.getFactoryIdentifier());
- if (exists) {
- resp.put(STATE, 4);
- resp.put("name", connectorResource.getFactoryIdentifier());
- return RestResponse.success(resp);
- }
-
- if (resourceParam.getId() != null) {
- Resource resource = getById(resourceParam.getId());
- if (!resource.getResourceName().equals(connectorResource.getFactoryIdentifier())) {
- resp.put(STATE, 5);
- return RestResponse.success().data(resp);
- }
- }
- resp.put(STATE, 0);
- resp.put("connector", JacksonUtils.write(connectorResource));
- return RestResponse.success().data(resp);
+ private RestResponse checkFlinkApp(Resource resourceParam) {
+ // check main.
+ File jarFile;
+ try {
+ jarFile = getResourceJar(resourceParam);
+ } catch (Exception e) {
+ // get jarFile error
+ return buildExceptResponse(e, 1);
+ }
+ ApiAlertException.throwIfTrue(jarFile == null, "flink app jar must exist.");
+ Map<String, Serializable> resp = new HashMap<>(0);
+ resp.put(STATE, 0);
+ if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) {
+ return RestResponse.success().data(resp);
+ }
+ String mainClass = Utils.getJarManClass(jarFile);
+ if (mainClass == null) {
+ // main class is null
+ return buildExceptResponse(new RuntimeException("main class is null"), 2);
}
return RestResponse.success().data(resp);
}
@@ -394,13 +370,7 @@
URL[] array =
jars.stream()
.map(
- x -> {
- try {
- return x.toURI().toURL();
- } catch (MalformedURLException e) {
- throw new RuntimeException(e);
- }
- })
+ file -> ExceptionUtils.wrapRuntimeException(file, handle -> handle.toURI().toURL()))
.toArray(URL[]::new);
try (URLClassLoader urlClassLoader = URLClassLoader.newInstance(array)) {
@@ -458,9 +428,9 @@
String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version());
Optional<File> jarFile =
files.stream().filter(x -> x.getName().equals(fileName)).findFirst();
- if (jarFile.isPresent()) {
- return jarFile.get();
- }
+ jarFile.ifPresent(
+ file -> transferTeamResource(resource.getTeamId(), file.getAbsolutePath()));
+ return jarFile.orElse(null);
}
return null;
}