[ISSUES-3627] Fix Adding flink1.19 environment configuration exception (#3629)
* Added flink conf compatibility, flink-conf.yaml changed to conf.yaml after flink 1.19
---------
Co-authored-by: qinjiyong <qinjiyong@jorme.sg>
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index b56edf6..d8a2134 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -20,6 +20,7 @@
import org.apache.streampark.common.conf.FlinkVersion;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.commons.io.FileUtils;
@@ -72,8 +73,29 @@
private transient String streamParkScalaVersion = scala.util.Properties.versionNumberString();
public void doSetFlinkConf() throws ApiDetailException {
+
+ File yaml;
+ float ver = Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle()));
+ if (ver < 1.19f) {
+ yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
+ if (!yaml.exists()) {
+ throw new ApiAlertException("cannot find flink-conf.yaml in flink/conf ");
+ }
+ } else if (ver == 1.19f) {
+ yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
+ if (!yaml.exists()) {
+ yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
+ }
+ if (!yaml.exists()) {
+ throw new ApiAlertException("cannot find config.yaml|flink-conf.yaml in flink/conf ");
+ }
+ } else {
+ yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
+ if (!yaml.exists()) {
+ throw new ApiAlertException("cannot find config.yaml in flink/conf ");
+ }
+ }
try {
- File yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
String flinkConf = FileUtils.readFileToString(yaml, StandardCharsets.UTF_8);
this.flinkConf = DeflaterUtils.zipString(flinkConf);
} catch (Exception e) {