[ISSUE-3066][console-service] Improve streampark-console module base on [3.3 Methods Rule] (#3597)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/ApiAlertException.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/ApiAlertException.java
index 03b2945..ecb1d22 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/ApiAlertException.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/exception/ApiAlertException.java
@@ -46,9 +46,21 @@
super(message, cause, ResponseCode.CODE_FAIL_ALERT);
}
- public static void throwIfNull(Object object, String errorMessage) {
+ public static void throwIfNull(Object object, String errorMsgFmt, Object... args) {
if (Objects.isNull(object)) {
- throw new ApiAlertException(errorMessage);
+ if (args == null || args.length < 1) {
+ throw new ApiAlertException(errorMsgFmt);
+ }
+ throw new ApiAlertException(String.format(errorMsgFmt, args));
+ }
+ }
+
+ public static void throwIfNotNull(Object object, String errorMsgFmt, Object... args) {
+ if (!Objects.isNull(object)) {
+ if (args == null || args.length < 1) {
+ throw new ApiAlertException(errorMsgFmt);
+ }
+ throw new ApiAlertException(String.format(errorMsgFmt, args));
}
}
@@ -58,9 +70,12 @@
}
}
- public static void throwIfTrue(boolean expression, String errorMessage) {
+ public static void throwIfTrue(boolean expression, String errorMsgFmt, Object... args) {
if (expression) {
- throw new ApiAlertException(errorMessage);
+ if (args == null || args.length < 1) {
+ throw new ApiAlertException(errorMsgFmt);
+ }
+ throw new ApiAlertException(String.format(errorMsgFmt, args));
}
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
index 5173724..f3e713e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
@@ -28,6 +28,8 @@
import org.apache.streampark.console.core.service.application.ApplicationActionService;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
+import org.apache.flink.shaded.guava30.com.google.common.base.Preconditions;
+
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.Data;
@@ -97,41 +99,51 @@
saveSavepoint(checkPoint, application.getId());
}
} else if (shouldProcessFailedTrigger(checkPoint, application.cpFailedTrigger(), status)) {
- Counter counter = checkPointFailedCache.get(appId);
- if (counter == null) {
- checkPointFailedCache.put(appId, new Counter(checkPoint.getTriggerTimestamp()));
- } else {
- long minute = counter.getDuration(checkPoint.getTriggerTimestamp());
- if (minute <= application.getCpFailureRateInterval()
- && counter.getCount() >= application.getCpMaxFailureInterval()) {
- checkPointFailedCache.remove(appId);
- FailoverStrategyEnum failoverStrategyEnum =
- FailoverStrategyEnum.of(application.getCpFailureAction());
- if (failoverStrategyEnum == null) {
- throw new IllegalArgumentException(
- "Unexpected cpFailureAction: " + application.getCpFailureAction());
- }
- switch (failoverStrategyEnum) {
- case ALERT:
- alertService.alert(
- application.getAlertId(),
- AlertTemplate.of(application, CheckPointStatusEnum.FAILED));
- break;
- case RESTART:
- try {
- applicationActionService.restart(application);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- break;
- default:
- // do nothing
- break;
- }
- } else {
- counter.increment();
+ processFailedCheckpoint(application, checkPoint, appId);
+ }
+ }
+
+ private void processFailedCheckpoint(
+ Application application, @Nonnull CheckPoints.CheckPoint checkPoint, Long appId) {
+ Counter counter = checkPointFailedCache.get(appId);
+ if (counter == null) {
+ checkPointFailedCache.put(appId, new Counter(checkPoint.getTriggerTimestamp()));
+ return;
+ }
+
+ long minute = counter.getDuration(checkPoint.getTriggerTimestamp());
+ if (minute > application.getCpFailureRateInterval()
+ || counter.getCount() < application.getCpMaxFailureInterval()) {
+ counter.increment();
+ return;
+ }
+ checkPointFailedCache.remove(appId);
+ FailoverStrategyEnum failoverStrategyEnum =
+ FailoverStrategyEnum.of(application.getCpFailureAction());
+ Preconditions.checkArgument(
+ failoverStrategyEnum != null,
+ "Unexpected cpFailureAction: %s",
+ application.getCpFailureAction());
+ processFailoverStrategy(application, failoverStrategyEnum);
+ }
+
+ private void processFailoverStrategy(
+ Application application, FailoverStrategyEnum failoverStrategyEnum) {
+ switch (failoverStrategyEnum) {
+ case ALERT:
+ alertService.alert(
+ application.getAlertId(), AlertTemplate.of(application, CheckPointStatusEnum.FAILED));
+ break;
+ case RESTART:
+ try {
+ applicationActionService.restart(application);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- }
+ break;
+ default:
+ // do nothing
+ break;
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationBackUp.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationBackUp.java
index 873588c..6a35b1c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationBackUp.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationBackUp.java
@@ -55,6 +55,10 @@
this.configId = application.getConfigId();
this.description = application.getBackUpDescription();
this.createTime = new Date();
+ renderPath(application);
+ }
+
+ private void renderPath(Application application) {
switch (application.getFlinkExecutionMode()) {
case KUBERNETES_NATIVE_APPLICATION:
case KUBERNETES_NATIVE_SESSION:
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
index a504148..970e4f5 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
@@ -31,6 +31,7 @@
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.Nullable;
import java.util.Base64;
import java.util.Collections;
@@ -77,25 +78,7 @@
}
public Map<String, String> readConfig() {
- ConfigFileTypeEnum fileType = ConfigFileTypeEnum.of(this.format);
- Map<String, String> configs = null;
- if (fileType != null) {
- switch (fileType) {
- case YAML:
- configs = PropertiesUtils.fromYamlTextAsJava(DeflaterUtils.unzipString(this.content));
- break;
- case PROPERTIES:
- configs =
- PropertiesUtils.fromPropertiesTextAsJava(DeflaterUtils.unzipString(this.content));
- break;
- case HOCON:
- configs = PropertiesUtils.fromHoconTextAsJava(DeflaterUtils.unzipString(this.content));
- break;
- default:
- configs = Collections.emptyMap();
- break;
- }
- }
+ Map<String, String> configs = renderConfigs();
if (MapUtils.isNotEmpty(configs)) {
return configs.entrySet().stream()
@@ -120,4 +103,22 @@
}
return Collections.emptyMap();
}
+
+ @Nullable
+ private Map<String, String> renderConfigs() {
+ ConfigFileTypeEnum fileType = ConfigFileTypeEnum.of(this.format);
+ if (fileType == null) {
+ return null;
+ }
+ switch (fileType) {
+ case YAML:
+ return PropertiesUtils.fromYamlTextAsJava(DeflaterUtils.unzipString(this.content));
+ case PROPERTIES:
+ return PropertiesUtils.fromPropertiesTextAsJava(DeflaterUtils.unzipString(this.content));
+ case HOCON:
+ return PropertiesUtils.fromHoconTextAsJava(DeflaterUtils.unzipString(this.content));
+ default:
+ return Collections.emptyMap();
+ }
+ }
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
index 813598f..1005cb3 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
@@ -28,6 +28,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.shaded.guava30.com.google.common.base.Preconditions;
import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.IdType;
@@ -39,6 +40,8 @@
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jgit.lib.Constants;
+import javax.annotation.Nonnull;
+
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
@@ -189,9 +192,7 @@
String mvn = windows ? "mvn.cmd" : "mvn";
String mavenHome = System.getenv("M2_HOME");
- if (mavenHome == null) {
- mavenHome = System.getenv("MAVEN_HOME");
- }
+ mavenHome = mavenHome == null ? System.getenv("MAVEN_HOME") : mavenHome;
boolean useWrapper = true;
if (mavenHome != null) {
@@ -207,43 +208,46 @@
}
if (useWrapper) {
- if (windows) {
- mvn = WebUtils.getAppHome().concat("/bin/mvnw.cmd");
- } else {
- mvn = WebUtils.getAppHome().concat("/bin/mvnw");
- }
+ mvn = WebUtils.getAppHome().concat(windows ? "/bin/mvnw.cmd" : "/bin/mvnw");
}
- StringBuilder cmdBuffer = new StringBuilder(mvn).append(" clean package -DskipTests ");
+ return renderCmd(mvn);
+ }
+ @Nonnull
+ private String renderCmd(String mvn) {
+ StringBuilder cmdBuffer = new StringBuilder(mvn).append(" clean package -DskipTests ");
+ renderCmdByBuildArgs(cmdBuffer);
+ renderCmdBySetting(cmdBuffer);
+ return cmdBuffer.toString();
+ }
+
+ private void renderCmdByBuildArgs(StringBuilder cmdBuffer) {
if (StringUtils.isNotBlank(this.buildArgs)) {
String args = getIllegalArgs(this.buildArgs);
- if (args != null) {
- throw new IllegalArgumentException(
- String.format(
- "Illegal argument: \"%s\" in maven build parameters: %s", args, this.buildArgs));
- }
+ Preconditions.checkArgument(
+ args == null,
+ "Illegal argument: \"%s\" in maven build parameters: %s",
+ args,
+ this.buildArgs);
cmdBuffer.append(this.buildArgs.trim());
}
+ }
+ private void renderCmdBySetting(StringBuilder cmdBuffer) {
String setting = InternalConfigHolder.get(CommonConfig.MAVEN_SETTINGS_PATH());
- if (StringUtils.isNotBlank(setting)) {
- String args = getIllegalArgs(setting);
- if (args != null) {
- throw new IllegalArgumentException(
- String.format("Illegal argument \"%s\" in maven-setting file path: %s", args, setting));
- }
- File file = new File(setting);
- if (file.exists() && file.isFile()) {
- cmdBuffer.append(" --settings ").append(setting);
- } else {
- throw new IllegalArgumentException(
- String.format(
- "Invalid maven-setting file path \"%s\", the path not exist or is not file",
- setting));
- }
+ if (StringUtils.isBlank(setting)) {
+ return;
}
- return cmdBuffer.toString();
+ String args = getIllegalArgs(setting);
+ Preconditions.checkArgument(
+ args == null, "Illegal argument \"%s\" in maven-setting file path: %s", args, setting);
+ File file = new File(setting);
+ Preconditions.checkArgument(
+ !file.exists() || !file.isFile(),
+ "Invalid maven-setting file path \"%s\", the path not exist or is not file",
+ setting);
+ cmdBuffer.append(" --settings ").append(setting);
}
private String getIllegalArgs(String param) {
@@ -269,11 +273,10 @@
@JsonIgnore
public String getMavenWorkHome() {
String buildHome = this.getAppSource().getAbsolutePath();
- if (StringUtils.isNotBlank(this.getPom())) {
- buildHome =
- new File(buildHome.concat("/").concat(this.getPom())).getParentFile().getAbsolutePath();
+ if (StringUtils.isBlank(this.getPom())) {
+ return buildHome;
}
- return buildHome;
+ return new File(buildHome.concat("/").concat(this.getPom())).getParentFile().getAbsolutePath();
}
@JsonIgnore
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index c338c47..032dc9e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -117,17 +117,8 @@
}
public synchronized void storageInitialize(StorageType storageType) {
- String appHome = WebUtils.getAppHome();
- if (StringUtils.isBlank(appHome)) {
- throw new ExceptionInInitializerError(
- String.format(
- "[StreamPark] Workspace path check failed,"
- + " The system initialization check failed. If started local for development and debugging,"
- + " please ensure the -D%s parameter is clearly specified,"
- + " more detail: https://streampark.apache.org/docs/user-guide/deployment",
- ConfigKeys.KEY_APP_HOME()));
- }
+ checkAppHome();
if (initialized.contains(storageType)) {
return;
@@ -137,6 +128,35 @@
Workspace workspace = Workspace.of(storageType);
// 1. prepare workspace dir
+ prepareWorkspace(storageType, fsOperator, workspace);
+ // 2. upload jar.
+ // 2.1) upload client jar
+ uploadClientJar(workspace, fsOperator);
+ // 2.2) upload plugin jar.
+ uploadPluginJar(workspace, fsOperator);
+ // 2.3) upload shims jar
+ uploadShimsJar(workspace, fsOperator);
+ // 2.4) create maven local repository dir
+ createMvnLocalRepoDir();
+
+ initialized.add(storageType);
+ }
+
+ private static void checkAppHome() {
+ final String appHome = WebUtils.getAppHome();
+ if (StringUtils.isBlank(appHome)) {
+ throw new ExceptionInInitializerError(
+ String.format(
+ "[StreamPark] Workspace path check failed,"
+ + " The system initialization check failed. If started local for development and debugging,"
+ + " please ensure the -D%s parameter is clearly specified,"
+ + " more detail: https://streampark.apache.org/docs/user-guide/deployment",
+ ConfigKeys.KEY_APP_HOME()));
+ }
+ }
+
+ private void prepareWorkspace(
+ StorageType storageType, FsOperator fsOperator, Workspace workspace) {
if (LFS == storageType) {
fsOperator.mkdirsIfNotExists(Workspace.APP_LOCAL_DIST());
}
@@ -148,9 +168,16 @@
workspace.APP_PYTHON(),
workspace.APP_JARS())
.forEach(fsOperator::mkdirsIfNotExists);
+ }
- // 2. upload jar.
- // 2.1) upload client jar
+ private static void createMvnLocalRepoDir() {
+ String localMavenRepo = Workspace.MAVEN_LOCAL_PATH();
+ if (FsOperator.lfs().exists(localMavenRepo)) {
+ FsOperator.lfs().mkdirs(localMavenRepo);
+ }
+ }
+
+ private void uploadClientJar(Workspace workspace, FsOperator fsOperator) {
File client = WebUtils.getAppClientDir();
Utils.required(
client.exists() && client.listFiles().length > 0,
@@ -163,18 +190,9 @@
log.info("load client:{} to {}", file.getName(), appClient);
fsOperator.upload(file.getAbsolutePath(), appClient);
}
+ }
- // 2.2) upload plugin jar.
- String appPlugins = workspace.APP_PLUGINS();
- fsOperator.mkCleanDirs(appPlugins);
-
- File plugins = WebUtils.getAppPluginsDir();
- for (File file : plugins.listFiles(fileFilter)) {
- log.info("load plugin:{} to {}", file.getName(), appPlugins);
- fsOperator.upload(file.getAbsolutePath(), appPlugins);
- }
-
- // 2.3) upload shims jar
+ private void uploadShimsJar(Workspace workspace, FsOperator fsOperator) {
File[] shims =
WebUtils.getAppLibDir()
.listFiles(pathname -> pathname.getName().matches(PATTERN_FLINK_SHIMS_JAR.pattern()));
@@ -193,14 +211,17 @@
fsOperator.upload(file.getAbsolutePath(), shimsPath);
}
}
+ }
- // 2.4) create maven local repository dir
- String localMavenRepo = Workspace.MAVEN_LOCAL_PATH();
- if (FsOperator.lfs().exists(localMavenRepo)) {
- FsOperator.lfs().mkdirs(localMavenRepo);
+ private void uploadPluginJar(Workspace workspace, FsOperator fsOperator) {
+ String appPlugins = workspace.APP_PLUGINS();
+ fsOperator.mkCleanDirs(appPlugins);
+
+ File plugins = WebUtils.getAppPluginsDir();
+ for (File file : plugins.listFiles(fileFilter)) {
+ log.info("load plugin:{} to {}", file.getName(), appPlugins);
+ fsOperator.upload(file.getAbsolutePath(), appPlugins);
}
-
- initialized.add(storageType);
}
public void checkFlinkEnv(StorageType storageType, FlinkEnv flinkEnv) throws IOException {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
index 1175096..87d4e1c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/AlertServiceImpl.java
@@ -33,6 +33,8 @@
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
+import javax.annotation.Nonnull;
+
import java.util.List;
@Slf4j
@@ -57,35 +59,7 @@
return true;
}
// No use thread pool, ensure that the alarm can be sent successfully
- Tuple2<Boolean, AlertException> reduce =
- alertTypeEnums.stream()
- .map(
- alertTypeEnum -> {
- try {
- boolean alertRes =
- SpringContextUtils.getBean(alertTypeEnum.getClazz())
- .doAlert(params, alertTemplate);
- return new Tuple2<Boolean, AlertException>(alertRes, null);
- } catch (AlertException e) {
- return new Tuple2<>(false, e);
- }
- })
- .reduce(
- new Tuple2<>(true, null),
- (tp1, tp2) -> {
- boolean alertResult = tp1.f0 & tp2.f0;
- if (tp1.f1 == null && tp2.f1 == null) {
- return new Tuple2<>(tp1.f0 & tp2.f0, null);
- }
- if (tp1.f1 != null && tp2.f1 != null) {
- // merge multiple exception, and keep the details of the first exception
- AlertException alertException =
- new AlertException(
- tp1.f1.getMessage() + "\n" + tp2.f1.getMessage(), tp1.f1);
- return new Tuple2<>(alertResult, alertException);
- }
- return new Tuple2<>(alertResult, tp1.f1 == null ? tp2.f1 : tp1.f1);
- });
+ Tuple2<Boolean, AlertException> reduce = triggerAlert(alertTemplate, alertTypeEnums, params);
if (reduce.f1 != null) {
throw reduce.f1;
}
@@ -96,4 +70,36 @@
}
return false;
}
+
+ @Nonnull
+ private Tuple2<Boolean, AlertException> triggerAlert(
+ AlertTemplate alertTemplate, List<AlertTypeEnum> alertTypeEnums, AlertConfigParams params) {
+ return alertTypeEnums.stream()
+ .map(
+ alertTypeEnum -> {
+ try {
+ boolean alertRes =
+ SpringContextUtils.getBean(alertTypeEnum.getClazz())
+ .doAlert(params, alertTemplate);
+ return new Tuple2<Boolean, AlertException>(alertRes, null);
+ } catch (AlertException e) {
+ return new Tuple2<>(false, e);
+ }
+ })
+ .reduce(
+ new Tuple2<>(true, null),
+ (tp1, tp2) -> {
+ boolean alertResult = tp1.f0 & tp2.f0;
+ if (tp1.f1 == null && tp2.f1 == null) {
+ return new Tuple2<>(tp1.f0 & tp2.f0, null);
+ }
+ if (tp1.f1 != null && tp2.f1 != null) {
+ // merge multiple exception, and keep the details of the first exception
+ AlertException alertException =
+ new AlertException(tp1.f1.getMessage() + "\n" + tp2.f1.getMessage(), tp1.f1);
+ return new Tuple2<>(alertResult, alertException);
+ }
+ return new Tuple2<>(alertResult, tp1.f1 == null ? tp2.f1 : tp1.f1);
+ });
+ }
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java
index 9016d7a..38b24dd 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/DingTalkAlertNotifyServiceImpl.java
@@ -37,6 +37,7 @@
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
+import javax.annotation.Nonnull;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
@@ -73,29 +74,14 @@
if (StringUtils.hasLength(contacts)) {
Collections.addAll(contactList, contacts.split(","));
}
- String title = alertTemplate.getTitle();
- if (!contactList.isEmpty()) {
- StringJoiner joiner = new StringJoiner(",@", title + " @", "");
- contactList.forEach(joiner::add);
- title = joiner.toString();
- }
- Map<String, Object> contactMap = new HashMap<>();
- contactMap.put("atMobiles", contactList);
- contactMap.put("isAtAll", BooleanUtils.toBoolean(dingTalkParams.getIsAtAll()));
-
+ String title = renderTitle(alertTemplate, contactList);
+ Map<String, Object> contactMap = renderContact(contactList, dingTalkParams);
// format markdown
String markdown = FreemarkerUtils.format(template, alertTemplate);
+ Map<String, String> contentMap = renderContent(title, markdown);
+ Map<String, Object> bodyMap = renderBody(contentMap, contactMap);
- Map<String, String> content = new HashMap<>();
- content.put("title", title);
- content.put("text", markdown);
-
- Map<String, Object> body = new HashMap<>();
- body.put("msgtype", "markdown");
- body.put("markdown", content);
- body.put("at", contactMap);
-
- sendMessage(dingTalkParams, body);
+ sendMessage(dingTalkParams, bodyMap);
return true;
} catch (AlertException alertException) {
throw alertException;
@@ -104,6 +90,43 @@
}
}
+ @Nonnull
+ private Map<String, Object> renderBody(
+ Map<String, String> content, Map<String, Object> contactMap) {
+ Map<String, Object> body = new HashMap<>();
+ body.put("msgtype", "markdown");
+ body.put("markdown", content);
+ body.put("at", contactMap);
+ return body;
+ }
+
+ @Nonnull
+ private Map<String, String> renderContent(String title, String markdown) {
+ Map<String, String> content = new HashMap<>();
+ content.put("title", title);
+ content.put("text", markdown);
+ return content;
+ }
+
+ @Nonnull
+ private Map<String, Object> renderContact(
+ List<String> contactList, AlertDingTalkParams dingTalkParams) {
+ Map<String, Object> contactMap = new HashMap<>();
+ contactMap.put("atMobiles", contactList);
+ contactMap.put("isAtAll", BooleanUtils.toBoolean(dingTalkParams.getIsAtAll()));
+ return contactMap;
+ }
+
+ private String renderTitle(AlertTemplate alertTemplate, List<String> contactList) {
+ String title = alertTemplate.getTitle();
+ if (!contactList.isEmpty()) {
+ StringJoiner joiner = new StringJoiner(",@", title + " @", "");
+ contactList.forEach(joiner::add);
+ title = joiner.toString();
+ }
+ return title;
+ }
+
private void sendMessage(AlertDingTalkParams params, Map<String, Object> body)
throws AlertException {
// get webhook url
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java
index 7568f9d..2686ed2 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/HttpCallbackAlertNotifyServiceImpl.java
@@ -41,6 +41,8 @@
import org.springframework.web.client.ResponseExtractor;
import org.springframework.web.client.RestTemplate;
+import javax.annotation.Nonnull;
+
import java.util.Map;
@Slf4j
@@ -77,24 +79,7 @@
private void sendMessage(AlertHttpCallbackParams params, Map<String, Object> body)
throws AlertException {
String url = params.getUrl();
- HttpHeaders headers = new HttpHeaders();
- String contentType = params.getContentType();
- MediaType mediaType = MediaType.APPLICATION_JSON;
- if (StringUtils.hasLength(contentType)) {
- switch (contentType.toLowerCase()) {
- case MediaType.APPLICATION_FORM_URLENCODED_VALUE:
- mediaType = MediaType.APPLICATION_FORM_URLENCODED;
- break;
- case MediaType.MULTIPART_FORM_DATA_VALUE:
- mediaType = MediaType.MULTIPART_FORM_DATA;
- break;
- case MediaType.APPLICATION_JSON_VALUE:
- default:
- break;
- }
- }
- headers.setContentType(mediaType);
-
+ HttpHeaders headers = getHttpHeaders(params);
ResponseEntity<Object> response;
try {
HttpMethod httpMethod = HttpMethod.POST;
@@ -119,4 +104,26 @@
throw new AlertException(String.format("Failed to request httpCallback alert,%nurl:%s", url));
}
}
+
+ @Nonnull
+ private HttpHeaders getHttpHeaders(AlertHttpCallbackParams params) {
+ HttpHeaders headers = new HttpHeaders();
+ String contentType = params.getContentType();
+ MediaType mediaType = MediaType.APPLICATION_JSON;
+ if (StringUtils.hasLength(contentType)) {
+ switch (contentType.toLowerCase()) {
+ case MediaType.APPLICATION_FORM_URLENCODED_VALUE:
+ mediaType = MediaType.APPLICATION_FORM_URLENCODED;
+ break;
+ case MediaType.MULTIPART_FORM_DATA_VALUE:
+ mediaType = MediaType.MULTIPART_FORM_DATA;
+ break;
+ case MediaType.APPLICATION_JSON_VALUE:
+ default:
+ break;
+ }
+ }
+ headers.setContentType(mediaType);
+ return headers;
+ }
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java
index 248c6b4..86382c5 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/alert/impl/LarkAlertNotifyServiceImpl.java
@@ -37,6 +37,7 @@
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
+import javax.annotation.Nonnull;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
@@ -73,17 +74,7 @@
String markdown = FreemarkerUtils.format(template, alertTemplate);
Map<String, Object> cardMap =
mapper.readValue(markdown, new TypeReference<Map<String, Object>>() {});
-
- Map<String, Object> body = new HashMap<>();
- // get sign
- if (alertLarkParams.getSecretEnable()) {
- long timestamp = System.currentTimeMillis() / 1000;
- String sign = getSign(alertLarkParams.getSecretToken(), timestamp);
- body.put("timestamp", timestamp);
- body.put("sign", sign);
- }
- body.put("msg_type", "interactive");
- body.put("card", cardMap);
+ Map<String, Object> body = renderBody(alertLarkParams, cardMap);
sendMessage(alertLarkParams, body);
return true;
} catch (AlertException alertException) {
@@ -93,6 +84,22 @@
}
}
+ @Nonnull
+ private Map<String, Object> renderBody(
+ AlertLarkParams alertLarkParams, Map<String, Object> cardMap) {
+ Map<String, Object> body = new HashMap<>();
+ // get sign
+ if (alertLarkParams.getSecretEnable()) {
+ long timestamp = System.currentTimeMillis() / 1000;
+ String sign = getSign(alertLarkParams.getSecretToken(), timestamp);
+ body.put("timestamp", timestamp);
+ body.put("sign", sign);
+ }
+ body.put("msg_type", "interactive");
+ body.put("card", cardMap);
+ return body;
+ }
+
private void sendMessage(AlertLarkParams params, Map<String, Object> body) throws AlertException {
// get webhook url
String url = getWebhook(params);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 70f0231..2974faa 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -99,6 +99,7 @@
import com.google.common.collect.Sets;
import io.fabric8.kubernetes.client.KubernetesClientException;
import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
@@ -390,7 +391,6 @@
}
if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) {
-
ApiAlertException.throwIfTrue(
!applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(),
"[StreamPark] The same task name is already running in the yarn queue");
@@ -400,7 +400,6 @@
Utils.requireNotNull(buildPipeline);
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
-
ApiAlertException.throwIfNull(flinkEnv, "[StreamPark] can no found flink version");
// if manually started, clear the restart flag
@@ -413,17 +412,9 @@
appParam.setSavePointed(true);
application.setRestartCount(application.getRestartCount() + 1);
}
-
// 2) update app state to starting...
starting(application);
-
- String jobId = new JobID().toHexString();
- ApplicationLog applicationLog = new ApplicationLog();
- applicationLog.setOptionName(OperationEnum.START.getValue());
- applicationLog.setAppId(application.getId());
- applicationLog.setOptionTime(new Date());
- applicationLog.setUserId(commonService.getUserId());
-
+ ApplicationLog applicationLog = constructAppLog(application);
// set the latest to Effective, (it will only become the current effective at this time)
applicationManageService.toEffective(application);
@@ -468,7 +459,7 @@
flinkEnv.getFlinkConf(),
FlinkDevelopmentMode.of(application.getJobType()),
application.getId(),
- jobId,
+ new JobID().toHexString(),
application.getJobName(),
appConf,
application.getApplicationType(),
@@ -491,85 +482,114 @@
(response, throwable) -> {
// 1) remove Future
startFutureMap.remove(application.getId());
-
// 2) exception
if (throwable != null) {
- String exception = ExceptionUtils.stringifyException(throwable);
- applicationLog.setException(exception);
- applicationLog.setSuccess(false);
- applicationLogService.save(applicationLog);
- if (throwable instanceof CancellationException) {
- doStopped(application.getId());
- } else {
- Application app = getById(appParam.getId());
- app.setState(FlinkAppStateEnum.FAILED.getValue());
- app.setOptionState(OptionStateEnum.NONE.getValue());
- updateById(app);
- if (isKubernetesApp(app)) {
- k8SFlinkTrackMonitor.unWatching(toTrackId(app));
- } else {
- FlinkAppHttpWatcher.unWatching(appParam.getId());
- }
- }
+ processForException(appParam, throwable, applicationLog, application);
return;
}
-
// 3) success
- applicationLog.setSuccess(true);
- if (response.flinkConfig() != null) {
- String jmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
- if (jmMemory != null) {
- application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
- }
- String tmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
- if (tmMemory != null) {
- application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
- }
- }
- application.setAppId(response.clusterId());
- if (StringUtils.isNoneEmpty(response.jobId())) {
- application.setJobId(response.jobId());
- }
-
- if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
- application.setJobManagerUrl(response.jobManagerUrl());
- applicationLog.setJobManagerUrl(response.jobManagerUrl());
- }
- applicationLog.setYarnAppId(response.clusterId());
- application.setStartTime(new Date());
- application.setEndTime(null);
-
- // if start completed, will be added task to tracking queue
- if (isKubernetesApp(application)) {
- application.setRelease(ReleaseStateEnum.DONE.get());
- k8SFlinkTrackMonitor.doWatching(toTrackId(application));
- if (FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
- String domainName = settingService.getIngressModeDefault();
- if (StringUtils.isNotBlank(domainName)) {
- try {
- IngressController.configureIngress(
- domainName, application.getClusterId(), application.getK8sNamespace());
- } catch (KubernetesClientException e) {
- log.info("Failed to create ingress, stack info:{}", e.getMessage());
- applicationLog.setException(e.getMessage());
- applicationLog.setSuccess(false);
- applicationLogService.save(applicationLog);
- application.setState(FlinkAppStateEnum.FAILED.getValue());
- application.setOptionState(OptionStateEnum.NONE.getValue());
- }
- }
- }
- } else {
- FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING);
- FlinkAppHttpWatcher.doWatching(application);
- }
- // update app
- updateById(application);
- // save log
- applicationLogService.save(applicationLog);
+ processForSuccess(appParam, response, applicationLog, application);
});
}
+ @NotNull
+ private ApplicationLog constructAppLog(Application application) {
+ ApplicationLog applicationLog = new ApplicationLog();
+ applicationLog.setOptionName(OperationEnum.START.getValue());
+ applicationLog.setAppId(application.getId());
+ applicationLog.setOptionTime(new Date());
+ applicationLog.setUserId(commonService.getUserId());
+ return applicationLog;
+ }
+
+ private void processForSuccess(
+ Application appParam,
+ SubmitResponse response,
+ ApplicationLog applicationLog,
+ Application application) {
+ applicationLog.setSuccess(true);
+ if (response.flinkConfig() != null) {
+ String jmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
+ if (jmMemory != null) {
+ application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
+ }
+ String tmMemory = response.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
+ if (tmMemory != null) {
+ application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
+ }
+ }
+ application.setAppId(response.clusterId());
+ if (StringUtils.isNoneEmpty(response.jobId())) {
+ application.setJobId(response.jobId());
+ }
+
+ if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
+ application.setJobManagerUrl(response.jobManagerUrl());
+ applicationLog.setJobManagerUrl(response.jobManagerUrl());
+ }
+ applicationLog.setYarnAppId(response.clusterId());
+ application.setStartTime(new Date());
+ application.setEndTime(null);
+
+ // if start completed, will be added task to tracking queue
+ if (isKubernetesApp(application)) {
+ processForK8sApp(application, applicationLog);
+ } else {
+ FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING);
+ FlinkAppHttpWatcher.doWatching(application);
+ }
+ // update app
+ updateById(application);
+ // save log
+ applicationLogService.save(applicationLog);
+ }
+
+ private void processForK8sApp(Application application, ApplicationLog applicationLog) {
+ application.setRelease(ReleaseStateEnum.DONE.get());
+ k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+ if (!FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
+ return;
+ }
+ String domainName = settingService.getIngressModeDefault();
+ if (StringUtils.isNotBlank(domainName)) {
+ try {
+ IngressController.configureIngress(
+ domainName, application.getClusterId(), application.getK8sNamespace());
+ } catch (KubernetesClientException e) {
+ log.info("Failed to create ingress, stack info:{}", e.getMessage());
+ applicationLog.setException(e.getMessage());
+ applicationLog.setSuccess(false);
+ applicationLogService.save(applicationLog);
+ application.setState(FlinkAppStateEnum.FAILED.getValue());
+ application.setOptionState(OptionStateEnum.NONE.getValue());
+ }
+ }
+ }
+
+ private void processForException(
+ Application appParam,
+ Throwable throwable,
+ ApplicationLog applicationLog,
+ Application application) {
+ String exception = ExceptionUtils.stringifyException(throwable);
+ applicationLog.setException(exception);
+ applicationLog.setSuccess(false);
+ applicationLogService.save(applicationLog);
+ if (throwable instanceof CancellationException) {
+ doStopped(application.getId());
+ } else {
+ Application app = getById(appParam.getId());
+ app.setState(FlinkAppStateEnum.FAILED.getValue());
+ app.setOptionState(OptionStateEnum.NONE.getValue());
+ updateById(app);
+ if (isKubernetesApp(app)) {
+ k8SFlinkTrackMonitor.unWatching(toTrackId(app));
+ } else {
+ FlinkAppHttpWatcher.unWatching(appParam.getId());
+ }
+ }
+ }
+
/**
* Check whether a job with the same name is running in the yarn queue
*
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 9652666..2a59bad 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -64,6 +64,8 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import javax.annotation.Nonnull;
+
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
@@ -147,26 +149,15 @@
}
JobsOverview.Task task = app.getOverview();
if (task != null) {
- overview.setTotal(overview.getTotal() + task.getTotal());
- overview.setCreated(overview.getCreated() + task.getCreated());
- overview.setScheduled(overview.getScheduled() + task.getScheduled());
- overview.setDeploying(overview.getDeploying() + task.getDeploying());
- overview.setRunning(overview.getRunning() + task.getRunning());
- overview.setFinished(overview.getFinished() + task.getFinished());
- overview.setCanceling(overview.getCanceling() + task.getCanceling());
- overview.setCanceled(overview.getCanceled() + task.getCanceled());
- overview.setFailed(overview.getFailed() + task.getFailed());
- overview.setReconciling(overview.getReconciling() + task.getReconciling());
+ renderJobsOverviewTaskByTask(overview, task);
}
}
// merge metrics from flink kubernetes cluster
- FlinkMetricCV k8sMetric;
- if (K8sFlinkConfig.isV2Enabled()) {
- k8sMetric = flinkK8sObserver.getAggClusterMetricCV(teamId);
- } else {
- k8sMetric = k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
- }
+ FlinkMetricCV k8sMetric =
+ K8sFlinkConfig.isV2Enabled()
+ ? flinkK8sObserver.getAggClusterMetricCV(teamId)
+ : k8SFlinkTrackMonitor.getAccGroupMetrics(teamId.toString());
if (k8sMetric != null) {
totalJmMemory += k8sMetric.totalJmMemory();
totalTmMemory += k8sMetric.totalTmMemory();
@@ -174,14 +165,45 @@
totalSlot += k8sMetric.totalSlot();
availableSlot += k8sMetric.availableSlot();
runningJob += k8sMetric.runningJob();
- overview.setTotal(overview.getTotal() + k8sMetric.totalJob());
- overview.setRunning(overview.getRunning() + k8sMetric.runningJob());
- overview.setFinished(overview.getFinished() + k8sMetric.finishedJob());
- overview.setCanceled(overview.getCanceled() + k8sMetric.cancelledJob());
- overview.setFailed(overview.getFailed() + k8sMetric.failedJob());
+ renderJobsOverviewTaskByK8sMetric(overview, k8sMetric);
}
// result json
+ return constructDashboardMap(
+ overview, totalJmMemory, totalTmMemory, totalTm, availableSlot, totalSlot, runningJob);
+ }
+
+ private void renderJobsOverviewTaskByTask(JobsOverview.Task overview, JobsOverview.Task task) {
+ overview.setTotal(overview.getTotal() + task.getTotal());
+ overview.setCreated(overview.getCreated() + task.getCreated());
+ overview.setScheduled(overview.getScheduled() + task.getScheduled());
+ overview.setDeploying(overview.getDeploying() + task.getDeploying());
+ overview.setRunning(overview.getRunning() + task.getRunning());
+ overview.setFinished(overview.getFinished() + task.getFinished());
+ overview.setCanceling(overview.getCanceling() + task.getCanceling());
+ overview.setCanceled(overview.getCanceled() + task.getCanceled());
+ overview.setFailed(overview.getFailed() + task.getFailed());
+ overview.setReconciling(overview.getReconciling() + task.getReconciling());
+ }
+
+ private void renderJobsOverviewTaskByK8sMetric(
+ JobsOverview.Task overview, FlinkMetricCV k8sMetric) {
+ overview.setTotal(overview.getTotal() + k8sMetric.totalJob());
+ overview.setRunning(overview.getRunning() + k8sMetric.runningJob());
+ overview.setFinished(overview.getFinished() + k8sMetric.finishedJob());
+ overview.setCanceled(overview.getCanceled() + k8sMetric.cancelledJob());
+ overview.setFailed(overview.getFailed() + k8sMetric.failedJob());
+ }
+
+ @Nonnull
+ private Map<String, Serializable> constructDashboardMap(
+ JobsOverview.Task overview,
+ Integer totalJmMemory,
+ Integer totalTmMemory,
+ Integer totalTm,
+ Integer availableSlot,
+ Integer totalSlot,
+ Integer runningJob) {
Map<String, Serializable> dashboardDataMap = new HashMap<>(8);
dashboardDataMap.put("task", overview);
dashboardDataMap.put("jmMemory", totalJmMemory);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 102e073..0fce782 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -275,29 +275,12 @@
// set duration
String restUrl = k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(record));
record.setFlinkRestUrl(restUrl);
- if (record.getTracking() == 1
- && record.getStartTime() != null
- && record.getStartTime().getTime() > 0) {
- record.setDuration(now - record.getStartTime().getTime());
- }
+ setAppDurationIfNeeded(record, now);
}
if (pipeStates.containsKey(record.getId())) {
record.setBuildStatus(pipeStates.get(record.getId()).getCode());
}
-
- AppControl appControl =
- new AppControl()
- .setAllowBuild(
- record.getBuildStatus() == null
- || !PipelineStatusEnum.running
- .getCode()
- .equals(record.getBuildStatus()))
- .setAllowStart(
- !record.shouldTracking()
- && PipelineStatusEnum.success
- .getCode()
- .equals(record.getBuildStatus()))
- .setAllowStop(record.isRunning());
+ AppControl appControl = getAppControl(record);
record.setAppControl(appControl);
})
.collect(Collectors.toList());
@@ -305,6 +288,25 @@
return page;
}
+ private void setAppDurationIfNeeded(Application record, long now) {
+ if (record.getTracking() == 1
+ && record.getStartTime() != null
+ && record.getStartTime().getTime() > 0) {
+ record.setDuration(now - record.getStartTime().getTime());
+ }
+ }
+
+ private AppControl getAppControl(Application record) {
+ return new AppControl()
+ .setAllowBuild(
+ record.getBuildStatus() == null
+ || !PipelineStatusEnum.running.getCode().equals(record.getBuildStatus()))
+ .setAllowStart(
+ !record.shouldTracking()
+ && PipelineStatusEnum.success.getCode().equals(record.getBuildStatus()))
+ .setAllowStop(record.isRunning());
+ }
+
@Override
public void changeOwnership(Long userId, Long targetUserId) {
LambdaUpdateWrapper<Application> updateWrapper =
@@ -792,11 +794,7 @@
// set duration
long now = System.currentTimeMillis();
- if (application.getTracking() == 1
- && application.getStartTime() != null
- && application.getStartTime().getTime() > 0) {
- application.setDuration(now - application.getStartTime().getTime());
- }
+ setAppDurationIfNeeded(application, now);
}
application.setYarnQueueByHotParams();
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index a43e1fe..37c09e9 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -168,16 +168,12 @@
* @return Whether the pipeline was successfully started
*/
@Override
- public boolean buildApplication(@NotNull Long appId, boolean forceBuild) {
+ public boolean buildApplication(@Nonnull Long appId, boolean forceBuild) {
// check the build environment
checkBuildEnv(appId, forceBuild);
Application app = applicationManageService.getById(appId);
- ApplicationLog applicationLog = new ApplicationLog();
- applicationLog.setOptionName(RELEASE.getValue());
- applicationLog.setAppId(app.getId());
- applicationLog.setOptionTime(new Date());
- applicationLog.setUserId(commonService.getUserId());
+ ApplicationLog applicationLog = getApplicationLog(app);
// check if you need to go through the build process (if the jar and pom have changed,
// you need to go through the build process, if other common parameters are modified,
@@ -361,25 +357,7 @@
});
// save docker resolve progress detail to cache, only for flink-k8s application mode.
if (PipelineTypeEnum.FLINK_NATIVE_K8S_APPLICATION == pipeline.pipeType()) {
- pipeline
- .as(FlinkK8sApplicationBuildPipeline.class)
- .registerDockerProgressWatcher(
- new DockerProgressWatcher() {
- @Override
- public void onDockerPullProgressChange(DockerPullSnapshot snapshot) {
- DOCKER_PULL_PG_SNAPSHOTS.put(app.getId(), snapshot);
- }
-
- @Override
- public void onDockerBuildProgressChange(DockerBuildSnapshot snapshot) {
- DOCKER_BUILD_PG_SNAPSHOTS.put(app.getId(), snapshot);
- }
-
- @Override
- public void onDockerPushProgressChange(DockerPushSnapshot snapshot) {
- DOCKER_PUSH_PG_SNAPSHOTS.put(app.getId(), snapshot);
- }
- });
+ registerDockerProgressWatcher(pipeline, app);
}
// save pipeline instance snapshot to db before release it.
AppBuildPipeline buildPipeline =
@@ -393,6 +371,38 @@
return saved;
}
+ private void registerDockerProgressWatcher(BuildPipeline pipeline, Application app) {
+ pipeline
+ .as(FlinkK8sApplicationBuildPipeline.class)
+ .registerDockerProgressWatcher(
+ new DockerProgressWatcher() {
+ @Override
+ public void onDockerPullProgressChange(DockerPullSnapshot snapshot) {
+ DOCKER_PULL_PG_SNAPSHOTS.put(app.getId(), snapshot);
+ }
+
+ @Override
+ public void onDockerBuildProgressChange(DockerBuildSnapshot snapshot) {
+ DOCKER_BUILD_PG_SNAPSHOTS.put(app.getId(), snapshot);
+ }
+
+ @Override
+ public void onDockerPushProgressChange(DockerPushSnapshot snapshot) {
+ DOCKER_PUSH_PG_SNAPSHOTS.put(app.getId(), snapshot);
+ }
+ });
+ }
+
+ @Nonnull
+ private ApplicationLog getApplicationLog(Application app) {
+ ApplicationLog applicationLog = new ApplicationLog();
+ applicationLog.setOptionName(RELEASE.getValue());
+ applicationLog.setAppId(app.getId());
+ applicationLog.setOptionTime(new Date());
+ applicationLog.setUserId(commonService.getUserId());
+ return applicationLog;
+ }
+
/**
* check the build environment
*
@@ -443,69 +453,26 @@
localWorkspace = app.getLocalAppHome();
}
FlinkYarnApplicationBuildRequest yarnAppRequest =
- new FlinkYarnApplicationBuildRequest(
- app.getJobName(),
- mainClass,
- localWorkspace,
- yarnProvidedPath,
- app.getDevelopmentMode(),
- getMergedDependencyInfo(app));
+ buildFlinkYarnApplicationBuildRequest(app, mainClass, localWorkspace, yarnProvidedPath);
log.info("Submit params to building pipeline : {}", yarnAppRequest);
return FlinkYarnApplicationBuildPipeline.of(yarnAppRequest);
case YARN_PER_JOB:
case YARN_SESSION:
case REMOTE:
FlinkRemotePerJobBuildRequest buildRequest =
- new FlinkRemotePerJobBuildRequest(
- app.getJobName(),
- app.getLocalAppHome(),
- mainClass,
- flinkUserJar,
- app.isCustomCodeJob(),
- app.getFlinkExecutionMode(),
- app.getDevelopmentMode(),
- flinkEnv.getFlinkVersion(),
- getMergedDependencyInfo(app));
+ buildFlinkRemotePerJobBuildRequest(app, mainClass, flinkUserJar, flinkEnv);
log.info("Submit params to building pipeline : {}", buildRequest);
return FlinkRemoteBuildPipeline.of(buildRequest);
case KUBERNETES_NATIVE_SESSION:
FlinkK8sSessionBuildRequest k8sSessionBuildRequest =
- new FlinkK8sSessionBuildRequest(
- app.getJobName(),
- app.getLocalAppHome(),
- mainClass,
- flinkUserJar,
- app.getFlinkExecutionMode(),
- app.getDevelopmentMode(),
- flinkEnv.getFlinkVersion(),
- getMergedDependencyInfo(app),
- app.getClusterId(),
- app.getK8sNamespace());
+ buildFlinkK8sSessionBuildRequest(app, mainClass, flinkUserJar, flinkEnv);
log.info("Submit params to building pipeline : {}", k8sSessionBuildRequest);
return FlinkK8sSessionBuildPipeline.of(k8sSessionBuildRequest);
case KUBERNETES_NATIVE_APPLICATION:
DockerConfig dockerConfig = settingService.getDockerConfig();
FlinkK8sApplicationBuildRequest k8sApplicationBuildRequest =
- new FlinkK8sApplicationBuildRequest(
- app.getJobName(),
- app.getLocalAppHome(),
- mainClass,
- flinkUserJar,
- app.getFlinkExecutionMode(),
- app.getDevelopmentMode(),
- flinkEnv.getFlinkVersion(),
- getMergedDependencyInfo(app),
- app.getClusterId(),
- app.getK8sNamespace(),
- app.getFlinkImage(),
- app.getK8sPodTemplates(),
- app.getK8sHadoopIntegration() != null ? app.getK8sHadoopIntegration() : false,
- DockerConf.of(
- dockerConfig.getAddress(),
- dockerConfig.getNamespace(),
- dockerConfig.getUser(),
- dockerConfig.getPassword()),
- app.getIngressTemplate());
+ buildFlinkK8sApplicationBuildRequest(
+ app, mainClass, flinkUserJar, flinkEnv, dockerConfig);
log.info("Submit params to building pipeline : {}", k8sApplicationBuildRequest);
if (K8sFlinkConfig.isV2Enabled()) {
return FlinkK8sApplicationBuildPipelineV2.of(k8sApplicationBuildRequest);
@@ -517,6 +484,84 @@
}
}
+ @NotNull
+ private FlinkYarnApplicationBuildRequest buildFlinkYarnApplicationBuildRequest(
+ @NotNull Application app, String mainClass, String localWorkspace, String yarnProvidedPath) {
+ FlinkYarnApplicationBuildRequest yarnAppRequest =
+ new FlinkYarnApplicationBuildRequest(
+ app.getJobName(),
+ mainClass,
+ localWorkspace,
+ yarnProvidedPath,
+ app.getDevelopmentMode(),
+ getMergedDependencyInfo(app));
+ return yarnAppRequest;
+ }
+
+ @Nonnull
+ private FlinkK8sApplicationBuildRequest buildFlinkK8sApplicationBuildRequest(
+ @Nonnull Application app,
+ String mainClass,
+ String flinkUserJar,
+ FlinkEnv flinkEnv,
+ DockerConfig dockerConfig) {
+ FlinkK8sApplicationBuildRequest k8sApplicationBuildRequest =
+ new FlinkK8sApplicationBuildRequest(
+ app.getJobName(),
+ app.getLocalAppHome(),
+ mainClass,
+ flinkUserJar,
+ app.getFlinkExecutionMode(),
+ app.getDevelopmentMode(),
+ flinkEnv.getFlinkVersion(),
+ getMergedDependencyInfo(app),
+ app.getClusterId(),
+ app.getK8sNamespace(),
+ app.getFlinkImage(),
+ app.getK8sPodTemplates(),
+ app.getK8sHadoopIntegration() != null ? app.getK8sHadoopIntegration() : false,
+ DockerConf.of(
+ dockerConfig.getAddress(),
+ dockerConfig.getNamespace(),
+ dockerConfig.getUser(),
+ dockerConfig.getPassword()),
+ app.getIngressTemplate());
+ return k8sApplicationBuildRequest;
+ }
+
+ @Nonnull
+ private FlinkK8sSessionBuildRequest buildFlinkK8sSessionBuildRequest(
+ @Nonnull Application app, String mainClass, String flinkUserJar, FlinkEnv flinkEnv) {
+ FlinkK8sSessionBuildRequest k8sSessionBuildRequest =
+ new FlinkK8sSessionBuildRequest(
+ app.getJobName(),
+ app.getLocalAppHome(),
+ mainClass,
+ flinkUserJar,
+ app.getFlinkExecutionMode(),
+ app.getDevelopmentMode(),
+ flinkEnv.getFlinkVersion(),
+ getMergedDependencyInfo(app),
+ app.getClusterId(),
+ app.getK8sNamespace());
+ return k8sSessionBuildRequest;
+ }
+
+ @Nonnull
+ private FlinkRemotePerJobBuildRequest buildFlinkRemotePerJobBuildRequest(
+ @Nonnull Application app, String mainClass, String flinkUserJar, FlinkEnv flinkEnv) {
+ return new FlinkRemotePerJobBuildRequest(
+ app.getJobName(),
+ app.getLocalAppHome(),
+ mainClass,
+ flinkUserJar,
+ app.isCustomCodeJob(),
+ app.getFlinkExecutionMode(),
+ app.getDevelopmentMode(),
+ flinkEnv.getFlinkVersion(),
+ getMergedDependencyInfo(app));
+ }
+
/** copy from {@link ApplicationActionService#start(Application, boolean)} */
private String retrieveFlinkUserJar(FlinkEnv flinkEnv, Application app) {
switch (app.getDevelopmentMode()) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
index 76220dd..d6d3c90 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java
@@ -102,60 +102,70 @@
// flink sql job
ApplicationConfig latestConfig = getLatest(appParam.getId());
if (appParam.isFlinkSqlJob()) {
- // get effect config
- ApplicationConfig effectiveConfig = getEffective(appParam.getId());
- if (Utils.isEmpty(appParam.getConfig())) {
- if (effectiveConfig != null) {
- effectiveService.remove(appParam.getId(), EffectiveTypeEnum.CONFIG);
+ updateForFlinkSqlJob(appParam, latest, latestConfig);
+ } else {
+ updateForNonFlinkSqlJob(appParam, latest, latestConfig);
+ }
+ }
+
+ private void updateForNonFlinkSqlJob(
+ Application appParam, Boolean latest, ApplicationConfig latestConfig) {
+ // may be re-selected a config file (without config id), or may be based on an original edit
+ // (with config Id).
+ Long configId = appParam.getConfigId();
+ // an original edit
+ if (configId != null) {
+ ApplicationConfig config = this.getById(configId);
+ String decode = new String(Base64.getDecoder().decode(appParam.getConfig()));
+ String encode = DeflaterUtils.zipString(decode.trim());
+ // create...
+ if (!config.getContent().equals(encode)) {
+ if (latestConfig != null) {
+ removeById(latestConfig.getId());
}
+ this.create(appParam, latest);
} else {
- // there was no configuration before, is a new configuration
- if (effectiveConfig == null) {
- if (latestConfig != null) {
- removeById(latestConfig.getId());
- }
- this.create(appParam, latest);
- } else {
- String decode = new String(Base64.getDecoder().decode(appParam.getConfig()));
- String encode = DeflaterUtils.zipString(decode.trim());
- // need to diff the two configs are consistent
- if (!effectiveConfig.getContent().equals(encode)) {
- if (latestConfig != null) {
- removeById(latestConfig.getId());
- }
- this.create(appParam, latest);
- }
- }
+ this.setLatestOrEffective(latest, configId, appParam.getId());
}
} else {
- // may be re-selected a config file (without config id), or may be based on an original edit
- // (with config Id).
- Long configId = appParam.getConfigId();
- // an original edit
- if (configId != null) {
- ApplicationConfig config = this.getById(configId);
+ ApplicationConfig config = getEffective(appParam.getId());
+ if (config != null) {
String decode = new String(Base64.getDecoder().decode(appParam.getConfig()));
String encode = DeflaterUtils.zipString(decode.trim());
// create...
if (!config.getContent().equals(encode)) {
+ this.create(appParam, latest);
+ }
+ } else {
+ this.create(appParam, latest);
+ }
+ }
+ }
+
+ private void updateForFlinkSqlJob(
+ Application appParam, Boolean latest, ApplicationConfig latestConfig) {
+ // get effect config
+ ApplicationConfig effectiveConfig = getEffective(appParam.getId());
+ if (Utils.isEmpty(appParam.getConfig())) {
+ if (effectiveConfig != null) {
+ effectiveService.remove(appParam.getId(), EffectiveTypeEnum.CONFIG);
+ }
+ } else {
+ // there was no configuration before, is a new configuration
+ if (effectiveConfig == null) {
+ if (latestConfig != null) {
+ removeById(latestConfig.getId());
+ }
+ this.create(appParam, latest);
+ } else {
+ String decode = new String(Base64.getDecoder().decode(appParam.getConfig()));
+ String encode = DeflaterUtils.zipString(decode.trim());
+ // need to diff the two configs are consistent
+ if (!effectiveConfig.getContent().equals(encode)) {
if (latestConfig != null) {
removeById(latestConfig.getId());
}
this.create(appParam, latest);
- } else {
- this.setLatestOrEffective(latest, configId, appParam.getId());
- }
- } else {
- ApplicationConfig config = getEffective(appParam.getId());
- if (config != null) {
- String decode = new String(Base64.getDecoder().decode(appParam.getConfig()));
- String encode = DeflaterUtils.zipString(decode.trim());
- // create...
- if (!config.getContent().equals(encode)) {
- this.create(appParam, latest);
- }
- } else {
- this.create(appParam, latest);
}
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 6587436..ea80ed2 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -215,24 +215,10 @@
flinkCluster.setAlertId(paramOfCluster.getAlertId());
flinkCluster.setDescription(paramOfCluster.getDescription());
if (FlinkExecutionMode.isRemoteMode(flinkCluster.getFlinkExecutionModeEnum())) {
- flinkCluster.setAddress(paramOfCluster.getAddress());
- flinkCluster.setClusterState(ClusterState.RUNNING.getState());
- flinkCluster.setStartTime(new Date());
- flinkCluster.setEndTime(null);
+ updateFlinkClusterForRemoteMode(paramOfCluster, flinkCluster);
FlinkClusterWatcher.addWatching(flinkCluster);
} else {
- flinkCluster.setClusterId(paramOfCluster.getClusterId());
- flinkCluster.setVersionId(paramOfCluster.getVersionId());
- flinkCluster.setDynamicProperties(paramOfCluster.getDynamicProperties());
- flinkCluster.setOptions(paramOfCluster.getOptions());
- flinkCluster.setResolveOrder(paramOfCluster.getResolveOrder());
- flinkCluster.setK8sHadoopIntegration(paramOfCluster.getK8sHadoopIntegration());
- flinkCluster.setK8sConf(paramOfCluster.getK8sConf());
- flinkCluster.setK8sNamespace(paramOfCluster.getK8sNamespace());
- flinkCluster.setK8sRestExposedType(paramOfCluster.getK8sRestExposedType());
- flinkCluster.setServiceAccount(paramOfCluster.getServiceAccount());
- flinkCluster.setFlinkImage(paramOfCluster.getFlinkImage());
- flinkCluster.setYarnQueue(paramOfCluster.getYarnQueue());
+ updateFlinkClusterForNonRemoteModes(paramOfCluster, flinkCluster);
}
if (shouldWatchForK8s(flinkCluster)) {
flinkK8sObserver.watchFlinkCluster(flinkCluster);
@@ -240,6 +226,30 @@
updateById(flinkCluster);
}
+ private void updateFlinkClusterForNonRemoteModes(
+ FlinkCluster paramOfCluster, FlinkCluster flinkCluster) {
+ flinkCluster.setClusterId(paramOfCluster.getClusterId());
+ flinkCluster.setVersionId(paramOfCluster.getVersionId());
+ flinkCluster.setDynamicProperties(paramOfCluster.getDynamicProperties());
+ flinkCluster.setOptions(paramOfCluster.getOptions());
+ flinkCluster.setResolveOrder(paramOfCluster.getResolveOrder());
+ flinkCluster.setK8sHadoopIntegration(paramOfCluster.getK8sHadoopIntegration());
+ flinkCluster.setK8sConf(paramOfCluster.getK8sConf());
+ flinkCluster.setK8sNamespace(paramOfCluster.getK8sNamespace());
+ flinkCluster.setK8sRestExposedType(paramOfCluster.getK8sRestExposedType());
+ flinkCluster.setServiceAccount(paramOfCluster.getServiceAccount());
+ flinkCluster.setFlinkImage(paramOfCluster.getFlinkImage());
+ flinkCluster.setYarnQueue(paramOfCluster.getYarnQueue());
+ }
+
+ private void updateFlinkClusterForRemoteMode(
+ FlinkCluster paramOfCluster, FlinkCluster flinkCluster) {
+ flinkCluster.setAddress(paramOfCluster.getAddress());
+ flinkCluster.setClusterState(ClusterState.RUNNING.getState());
+ flinkCluster.setStartTime(new Date());
+ flinkCluster.setEndTime(null);
+ }
+
@Override
public void shutdown(FlinkCluster cluster) {
FlinkCluster flinkCluster = this.getById(cluster.getId());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 092a70d..cdb1859 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -116,15 +116,7 @@
ApiAlertException.throwIfFalse(
!project.getBuildState().equals(BuildStateEnum.BUILDING.get()),
"The project is being built, update project failed.");
- project.setName(projectParam.getName());
- project.setUrl(projectParam.getUrl());
- project.setBranches(projectParam.getBranches());
- project.setPrvkeyPath(projectParam.getPrvkeyPath());
- project.setUserName(projectParam.getUserName());
- project.setPassword(projectParam.getPassword());
- project.setPom(projectParam.getPom());
- project.setDescription(projectParam.getDescription());
- project.setBuildArgs(projectParam.getBuildArgs());
+ updateInternal(projectParam, project);
if (project.isSshRepositoryUrl()) {
project.setUserName(null);
} else {
@@ -148,6 +140,18 @@
return true;
}
+ private static void updateInternal(Project projectParam, Project project) {
+ project.setName(projectParam.getName());
+ project.setUrl(projectParam.getUrl());
+ project.setBranches(projectParam.getBranches());
+ project.setPrvkeyPath(projectParam.getPrvkeyPath());
+ project.setUserName(projectParam.getUserName());
+ project.setPassword(projectParam.getPassword());
+ project.setPom(projectParam.getPom());
+ project.setDescription(projectParam.getDescription());
+ project.setBuildArgs(projectParam.getBuildArgs());
+ }
+
@Override
public boolean removeById(Long id) {
Project project = getById(id);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index 29e6e77..f2f2ef3 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -129,39 +129,18 @@
Dependency dependency = Dependency.toDependency(resourceStr);
List<String> jars = dependency.getJar();
List<MavenPom> poms = dependency.getPom();
-
- ApiAlertException.throwIfTrue(
- jars.isEmpty() && poms.isEmpty(), "Please add pom or jar resource.");
-
- ApiAlertException.throwIfTrue(
- resource.getResourceType() == ResourceTypeEnum.FLINK_APP && jars.isEmpty(),
- "Please upload jar for Flink_App resource");
-
- ApiAlertException.throwIfTrue(
- jars.size() + poms.size() > 1, "Please do not add multi dependency at one time.");
+ check(resource, jars, poms);
if (resource.getResourceType() == ResourceTypeEnum.CONNECTOR) {
- String connector = resource.getConnector();
- ApiAlertException.throwIfTrue(connector == null, "the flink connector is null.");
- FlinkConnector connectorResource = JacksonUtils.read(connector, FlinkConnector.class);
- resource.setResourceName(connectorResource.getFactoryIdentifier());
- Optional.ofNullable(connectorResource.getRequiredOptions())
- .ifPresent(
- v ->
- resource.setConnectorRequiredOptions(
- ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write)));
- Optional.ofNullable(connectorResource.getOptionalOptions())
- .ifPresent(
- v ->
- resource.setConnectorOptionalOptions(
- ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write)));
+ processConnectorResource(resource);
} else {
ApiAlertException.throwIfNull(resource.getResourceName(), "The resourceName is required.");
}
- ApiAlertException.throwIfTrue(
- this.findByResourceName(resource.getTeamId(), resource.getResourceName()) != null,
- String.format("the resource %s already exists, please check.", resource.getResourceName()));
+ ApiAlertException.throwIfNotNull(
+ this.findByResourceName(resource.getTeamId(), resource.getResourceName()),
+ "the resource %s already exists, please check.",
+ resource.getResourceName());
if (!jars.isEmpty()) {
String resourcePath = jars.get(0);
@@ -175,6 +154,33 @@
this.save(resource);
}
+ private static void processConnectorResource(Resource resource) throws JsonProcessingException {
+ String connector = resource.getConnector();
+ ApiAlertException.throwIfNull(connector, "the flink connector is null.");
+ FlinkConnector connectorResource = JacksonUtils.read(connector, FlinkConnector.class);
+ resource.setResourceName(connectorResource.getFactoryIdentifier());
+ Optional.ofNullable(connectorResource.getRequiredOptions())
+ .ifPresent(
+ v ->
+ resource.setConnectorRequiredOptions(
+ ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write)));
+ Optional.ofNullable(connectorResource.getOptionalOptions())
+ .ifPresent(
+ v ->
+ resource.setConnectorOptionalOptions(
+ ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write)));
+ }
+
+ private void check(Resource resource, List<String> jars, List<MavenPom> poms) {
+ ApiAlertException.throwIfTrue(
+ jars.isEmpty() && poms.isEmpty(), "Please add pom or jar resource.");
+ ApiAlertException.throwIfTrue(
+ resource.getResourceType() == ResourceTypeEnum.FLINK_APP && jars.isEmpty(),
+ "Please upload jar for Flink_App resource");
+ ApiAlertException.throwIfTrue(
+ jars.size() + poms.size() > 1, "Please do not add multi dependency at one time.");
+ }
+
@Override
public Resource findByResourceName(Long teamId, String name) {
LambdaQueryWrapper<Resource> queryWrapper =
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 7b303f0..2a570ba 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -58,6 +58,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
@@ -161,15 +162,7 @@
public void trigger(Long appId, @Nullable String savepointPath, @Nullable Boolean nativeFormat) {
log.info("Start to trigger savepoint for app {}", appId);
Application application = applicationManageService.getById(appId);
-
- ApplicationLog applicationLog = new ApplicationLog();
- applicationLog.setOptionName(OperationEnum.SAVEPOINT.getValue());
- applicationLog.setAppId(application.getId());
- applicationLog.setJobManagerUrl(application.getJobManagerUrl());
- applicationLog.setOptionTime(new Date());
- applicationLog.setYarnAppId(application.getClusterId());
- applicationLog.setUserId(commonService.getUserId());
-
+ ApplicationLog applicationLog = getApplicationLog(application);
FlinkAppHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue());
@@ -189,6 +182,18 @@
handleSavepointResponseFuture(application, applicationLog, savepointFuture);
}
+ @NotNull
+ private ApplicationLog getApplicationLog(Application application) {
+ ApplicationLog applicationLog = new ApplicationLog();
+ applicationLog.setOptionName(OperationEnum.SAVEPOINT.getValue());
+ applicationLog.setAppId(application.getId());
+ applicationLog.setJobManagerUrl(application.getJobManagerUrl());
+ applicationLog.setOptionTime(new Date());
+ applicationLog.setYarnAppId(application.getClusterId());
+ applicationLog.setUserId(commonService.getUserId());
+ return applicationLog;
+ }
+
@Override
public Boolean remove(Long id, Application appParam) throws InternalException {
SavePoint savePoint = getById(id);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java
index f891c44..291f1c7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java
@@ -98,13 +98,24 @@
String host = remoteURI.getHost();
String port = String.valueOf(remoteURI.getPort());
String clusterId = flinkCluster.getClusterId();
-
FlinkExecutionMode executionModeEnum = FlinkExecutionMode.of(flinkCluster.getExecutionMode());
- if (executionModeEnum == null) {
- throw new IllegalArgumentException("executionMode is null");
- }
streamParkConf.put("execution.target", executionModeEnum.getName());
+ renderConfByFlinkExecutionMode(
+ executionModeEnum, streamParkConf, host, port, clusterId, flinkCluster);
+
+ return sqlGateWayService.openSession(
+ new SessionEnvironment(
+ flinkGatewayId + flinkClusterId + UUID.randomUUID().toString(), null, streamParkConf));
+ }
+
+ private void renderConfByFlinkExecutionMode(
+ FlinkExecutionMode executionModeEnum,
+ Map<String, String> streamParkConf,
+ String host,
+ String port,
+ String clusterId,
+ FlinkCluster flinkCluster) {
switch (Objects.requireNonNull(executionModeEnum)) {
case REMOTE:
streamParkConf.put("rest.address", host);
@@ -135,10 +146,6 @@
default:
throw new IllegalArgumentException("Unsupported execution mode: " + executionModeEnum);
}
-
- return sqlGateWayService.openSession(
- new SessionEnvironment(
- flinkGatewayId + flinkClusterId + UUID.randomUUID().toString(), null, streamParkConf));
}
@Override
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index b4bf9c8..2073d3b 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -238,10 +238,47 @@
*/
private void getStateFromFlink(Application application) throws Exception {
JobsOverview jobsOverview = httpJobsOverview(application);
+ Optional<JobsOverview.Job> jobOptional = getJobsOverviewJob(application, jobsOverview);
+ if (jobOptional.isPresent()) {
+
+ processJobState(application, jobOptional);
+ }
+ }
+
+ private void processJobState(Application application, Optional<JobsOverview.Job> jobOptional)
+ throws Exception {
+ JobsOverview.Job jobOverview = jobOptional.get();
+ FlinkAppStateEnum currentState = FlinkAppStateEnum.of(jobOverview.getState());
+
+ if (FlinkAppStateEnum.OTHER != currentState) {
+ try {
+ // 1) set info from JobOverview
+ handleJobOverview(application, jobOverview);
+ } catch (Exception e) {
+ log.error("get flink jobOverview error: {}", e.getMessage(), e);
+ }
+ try {
+ // 2) CheckPoints
+ handleCheckPoints(application);
+ } catch (Exception e) {
+ log.error("get flink jobOverview error: {}", e.getMessage(), e);
+ }
+ // 3) savePoint obsolete check and NEED_START check
+ OptionStateEnum optionStateEnum = OPTIONING.get(application.getId());
+ if (FlinkAppStateEnum.RUNNING == currentState) {
+ handleRunningState(application, optionStateEnum, currentState);
+ } else {
+ handleNotRunState(application, optionStateEnum, currentState);
+ }
+ }
+ }
+
+ @Nonnull
+ private Optional<JobsOverview.Job> getJobsOverviewJob(
+ Application application, JobsOverview jobsOverview) {
Optional<JobsOverview.Job> optional;
FlinkExecutionMode execMode = application.getFlinkExecutionMode();
- if (FlinkExecutionMode.YARN_APPLICATION == execMode
- || FlinkExecutionMode.YARN_PER_JOB == execMode) {
+ if (FlinkExecutionMode.isYarnPerJobOrAppMode(execMode)) {
optional =
!jobsOverview.getJobs().isEmpty()
? jobsOverview.getJobs().stream()
@@ -254,33 +291,7 @@
.filter(x -> x.getId().equals(application.getJobId()))
.findFirst();
}
- if (optional.isPresent()) {
-
- JobsOverview.Job jobOverview = optional.get();
- FlinkAppStateEnum currentState = FlinkAppStateEnum.of(jobOverview.getState());
-
- if (FlinkAppStateEnum.OTHER != currentState) {
- try {
- // 1) set info from JobOverview
- handleJobOverview(application, jobOverview);
- } catch (Exception e) {
- log.error("get flink jobOverview error: {}", e.getMessage(), e);
- }
- try {
- // 2) CheckPoints
- handleCheckPoints(application);
- } catch (Exception e) {
- log.error("get flink jobOverview error: {}", e.getMessage(), e);
- }
- // 3) savePoint obsolete check and NEED_START check
- OptionStateEnum optionStateEnum = OPTIONING.get(application.getId());
- if (FlinkAppStateEnum.RUNNING == currentState) {
- handleRunningState(application, optionStateEnum, currentState);
- } else {
- handleNotRunState(application, optionStateEnum, currentState);
- }
- }
- }
+ return optional;
}
/**
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/SsoShiroPlugin.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/SsoShiroPlugin.java
index 326f1a6..94ab68c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/SsoShiroPlugin.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/SsoShiroPlugin.java
@@ -60,6 +60,20 @@
shiroService.addRealm(new Pac4jRealm());
// Construct the shiro filter for SSO
+ constructShiroFilterForSSO();
+
+ // Construct the filterChainDefinitionMap for SSO
+ LinkedHashMap<String, String> filterChainDefinitionMap = new LinkedHashMap<>();
+ filterChainDefinitionMap.put("/sso/signin", "ssoSecurityFilter");
+ filterChainDefinitionMap.put("/sso/token", "ssoSecurityFilter");
+ filterChainDefinitionMap.put("/pac4jLogout", "ssoLogoutFilter");
+ // Get callback endpoint from callbackUrl
+ String callbackEndpoint = URI.create(ssoConfig.getClients().getCallbackUrl()).getPath();
+ filterChainDefinitionMap.put(callbackEndpoint, "ssoCallbackFilter");
+ shiroService.addFilterChains(filterChainDefinitionMap);
+ }
+
+ private void constructShiroFilterForSSO() {
SecurityFilter securityFilter = new SecurityFilter();
CallbackFilter callbackFilter = new CallbackFilter();
LogoutFilter logoutFilter = new LogoutFilter();
@@ -72,15 +86,5 @@
filters.put("ssoCallbackFilter", callbackFilter);
filters.put("ssoLogoutFilter", logoutFilter);
shiroService.addFilters(filters);
-
- // Construct the filterChainDefinitionMap for SSO
- LinkedHashMap<String, String> filterChainDefinitionMap = new LinkedHashMap<>();
- filterChainDefinitionMap.put("/sso/signin", "ssoSecurityFilter");
- filterChainDefinitionMap.put("/sso/token", "ssoSecurityFilter");
- filterChainDefinitionMap.put("/pac4jLogout", "ssoLogoutFilter");
- // Get callback endpoint from callbackUrl
- String callbackEndpoint = URI.create(ssoConfig.getClients().getCallbackUrl()).getPath();
- filterChainDefinitionMap.put(callbackEndpoint, "ssoCallbackFilter");
- shiroService.addFilterChains(filterChainDefinitionMap);
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java
index 44e1eb9..2269b9e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/AuthenticatorImpl.java
@@ -64,7 +64,8 @@
ApiAlertException.throwIfTrue(
user.getLoginType() != LoginTypeEnum.PASSWORD,
- String.format("user [%s] can not login with PASSWORD", username));
+ "user [%s] can not login with PASSWORD",
+ username);
String salt = user.getSalt();
password = ShaHashUtils.encrypt(salt, password);
@@ -86,7 +87,9 @@
if (user != null) {
ApiAlertException.throwIfTrue(
user.getLoginType() != LoginTypeEnum.LDAP,
- String.format("user [%s] can only sign in with %s", username, user.getLoginType()));
+ "user [%s] can only sign in with %s",
+ username,
+ user.getLoginType());
return user;
}
@@ -100,7 +103,9 @@
if (user != null) {
ApiAlertException.throwIfTrue(
user.getLoginType() != LoginTypeEnum.SSO,
- String.format("user [%s] can only sign in with %s", username, user.getLoginType()));
+ "user [%s] can only sign in with %s",
+ username,
+ user.getLoginType());
return user;
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/LdapService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/LdapService.java
index e8215a1..45a2a73 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/LdapService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/security/impl/LdapService.java
@@ -73,10 +73,47 @@
* @return boolean ldapLoginStatus
*/
public boolean ldapLogin(String userId, String userPwd) {
-
ApiAlertException.throwIfFalse(
enable, "ldap is not enabled, Please check the configuration: ldap.enable");
+ renderLdapEnv();
+ try {
+ NamingEnumeration<SearchResult> results = getSearchResults(userId);
+ if (!results.hasMore()) {
+ return false;
+ }
+ SearchResult result = results.next();
+ NamingEnumeration<? extends Attribute> attrs = result.getAttributes().getAll();
+ while (attrs.hasMore()) {
+ ldapEnv.put(Context.SECURITY_PRINCIPAL, result.getNameInNamespace());
+ ldapEnv.put(Context.SECURITY_CREDENTIALS, userPwd);
+ try {
+ new InitialDirContext(ldapEnv);
+ } catch (Exception e) {
+ log.warn("invalid ldap credentials or ldap search error", e);
+ return false;
+ }
+ Attribute attr = attrs.next();
+ if (attr.getID().equals(ldapUserIdentifyingAttribute)) {
+ return true;
+ }
+ }
+ } catch (NamingException e) {
+ log.error("ldap search error", e);
+ }
+ return false;
+ }
+ private NamingEnumeration<SearchResult> getSearchResults(String userId) throws NamingException {
+ LdapContext ctx = new InitialLdapContext(ldapEnv, null);
+ SearchControls sc = new SearchControls();
+ sc.setReturningAttributes(new String[] {ldapUserIdentifyingAttribute});
+ sc.setSearchScope(SearchControls.SUBTREE_SCOPE);
+ EqualsFilter filter = new EqualsFilter(ldapUserIdentifyingAttribute, userId);
+ NamingEnumeration<SearchResult> results = ctx.search(ldapBaseDn, filter.toString(), sc);
+ return results;
+ }
+
+ private void renderLdapEnv() {
if (ldapEnv == null) {
ldapEnv = new Properties();
ldapEnv.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
@@ -86,36 +123,5 @@
ldapEnv.put(Context.SECURITY_PRINCIPAL, ldapSecurityPrincipal);
ldapEnv.put(Context.SECURITY_CREDENTIALS, ldapPrincipalPassword);
-
- try {
- LdapContext ctx = new InitialLdapContext(ldapEnv, null);
- SearchControls sc = new SearchControls();
- sc.setReturningAttributes(new String[] {ldapUserIdentifyingAttribute});
- sc.setSearchScope(SearchControls.SUBTREE_SCOPE);
- EqualsFilter filter = new EqualsFilter(ldapUserIdentifyingAttribute, userId);
- NamingEnumeration<SearchResult> results = ctx.search(ldapBaseDn, filter.toString(), sc);
- if (results.hasMore()) {
- SearchResult result = results.next();
- NamingEnumeration<? extends Attribute> attrs = result.getAttributes().getAll();
- while (attrs.hasMore()) {
- ldapEnv.put(Context.SECURITY_PRINCIPAL, result.getNameInNamespace());
- ldapEnv.put(Context.SECURITY_CREDENTIALS, userPwd);
- try {
- new InitialDirContext(ldapEnv);
- } catch (Exception e) {
- log.warn("invalid ldap credentials or ldap search error", e);
- return false;
- }
- Attribute attr = attrs.next();
- if (attr.getID().equals(ldapUserIdentifyingAttribute)) {
- return true;
- }
- }
- }
- } catch (NamingException e) {
- log.error("ldap search error", e);
- return false;
- }
- return false;
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
index 90f7975..e2154d0 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
@@ -42,7 +42,6 @@
import java.util.Arrays;
import java.util.Date;
import java.util.List;
-import java.util.Optional;
import java.util.stream.Collectors;
@Service
@@ -119,28 +118,18 @@
@Override
public void createMember(Member member) {
- User user =
- Optional.ofNullable(userService.getByUsername(member.getUserName()))
- .orElseThrow(
- () ->
- new ApiAlertException(
- String.format("The username [%s] not found", member.getUserName())));
- Optional.ofNullable(roleService.getById(member.getRoleId()))
- .orElseThrow(
- () ->
- new ApiAlertException(
- String.format("The roleId [%s] not found", member.getRoleId())));
- Team team =
- Optional.ofNullable(teamService.getById(member.getTeamId()))
- .orElseThrow(
- () ->
- new ApiAlertException(
- String.format("The teamId [%s] not found", member.getTeamId())));
- ApiAlertException.throwIfFalse(
- findByUserId(member.getTeamId(), user.getUserId()) == null,
- String.format(
- "The user [%s] has been added the team [%s], please don't add it again.",
- member.getUserName(), team.getTeamName()));
+ User user = userService.getByUsername(member.getUserName());
+ ApiAlertException.throwIfNull(user, "The username [%s] not found", member.getUserName());
+
+ ApiAlertException.throwIfNull(
+ roleService.getById(member.getRoleId()), "The roleId [%s] not found", member.getRoleId());
+ Team team = teamService.getById(member.getTeamId());
+ ApiAlertException.throwIfNull(team, "The teamId [%s] not found", member.getTeamId());
+ ApiAlertException.throwIfNotNull(
+ findByUserId(member.getTeamId(), user.getUserId()),
+ "The user [%s] has been added the team [%s], please don't add it again.",
+ member.getUserName(),
+ team.getTeamName());
member.setId(null);
member.setUserId(user.getUserId());
@@ -153,29 +142,20 @@
@Override
public void remove(Long id) {
- Member member =
- Optional.ofNullable(this.getById(id))
- .orElseThrow(
- () -> new ApiAlertException(String.format("The member [id=%s] not found", id)));
+ Member member = this.getById(id);
+ ApiAlertException.throwIfNull(member, "The member [id=%s] not found", id);
this.removeById(member);
userService.clearLastTeam(member.getUserId(), member.getTeamId());
}
@Override
public void updateMember(Member member) {
- Member oldMember =
- Optional.ofNullable(this.getById(member.getId()))
- .orElseThrow(
- () ->
- new ApiAlertException(
- String.format("The member [id=%s] not found", member.getId())));
+ Member oldMember = this.getById(member.getId());
+ ApiAlertException.throwIfNull(oldMember, "The member [id=%s] not found", member.getId());
Utils.required(oldMember.getTeamId().equals(member.getTeamId()), "Team id cannot be changed.");
Utils.required(oldMember.getUserId().equals(member.getUserId()), "User id cannot be changed.");
- Optional.ofNullable(roleService.getById(member.getRoleId()))
- .orElseThrow(
- () ->
- new ApiAlertException(
- String.format("The roleId [%s] not found", member.getRoleId())));
+ ApiAlertException.throwIfNull(
+ roleService.getById(member.getRoleId()), "The roleId [%s] not found", member.getRoleId());
oldMember.setRoleId(member.getRoleId());
updateById(oldMember);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
index d7079a3..fea8bb8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/TeamServiceImpl.java
@@ -96,22 +96,22 @@
log.info("{} Proceed delete team[Id={}]", commonService.getCurrentUser().getUsername(), teamId);
Team team = this.getById(teamId);
- ApiAlertException.throwIfNull(team, String.format("The team[Id=%s] doesn't exist.", teamId));
+ ApiAlertException.throwIfNull(team, "The team[Id=%s] doesn't exist.", teamId);
ApiAlertException.throwIfTrue(
applicationInfoService.existsByTeamId(teamId),
- String.format(
- "Please delete the applications under the team[name=%s] first!", team.getTeamName()));
+ "Please delete the applications under the team[name=%s] first!",
+ team.getTeamName());
ApiAlertException.throwIfTrue(
projectService.existsByTeamId(teamId),
- String.format(
- "Please delete the projects under the team[name=%s] first!", team.getTeamName()));
+ "Please delete the projects under the team[name=%s] first!",
+ team.getTeamName());
ApiAlertException.throwIfTrue(
variableService.existsByTeamId(teamId),
- String.format(
- "Please delete the variables under the team[name=%s] first!", team.getTeamName()));
+ "Please delete the variables under the team[name=%s] first!",
+ team.getTeamName());
memberService.removeByTeamId(teamId);
userService.clearLastTeam(teamId);