Fix flink-1.16 ClassNotFoundException bug (#5001)
* Fix flink-1.16 ClassNotFoundException bug
* Fix flink-1.16 ClassNotFoundException bug
diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
index 6b521dc..bcd721c 100644
--- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
+++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
@@ -38,7 +38,7 @@
val FLINK_DIST_JAR_PATH = CommonVars(
"flink.dist.jar.path",
- FLINK_HOME.getValue + s"/lib/flink-dist_2.11-${FLINK_VERSION.getValue}.jar"
+ FLINK_HOME.getValue + s"/lib/flink-dist-${FLINK_VERSION.getValue}.jar"
)
val FLINK_PROVIDED_LIB_PATH = CommonVars("flink.lib.path", "")
@@ -58,7 +58,9 @@
"The local lib path of each user in Flink EngineConn."
)
- val FLINK_SHIP_DIRECTORIES = CommonVars("flink.yarn.ship-directories", "")
+ val FLINK_SHIP_DIRECTORIES =
+ CommonVars("flink.yarn.ship-directories", FLINK_HOME.getValue + "/lib")
+
val FLINK_SHIP_REMOTE_DIRECTORIES = CommonVars("flink.yarn.remote.ship-directories", "")
val FLINK_CHECK_POINT_ENABLE = CommonVars("flink.app.checkpoint.enable", false)
diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
index 1c6db3b..1b9759d 100644
--- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
+++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
@@ -108,7 +108,13 @@
val flinkHome = FLINK_HOME.getValue(options)
val flinkConfDir = FLINK_CONF_DIR.getValue(options)
val flinkProvidedLibPath = FLINK_PROVIDED_LIB_PATH.getValue(options)
- val flinkDistJarPath = FLINK_DIST_JAR_PATH.getValue(options)
+ val flinkVersion = FlinkEnvConfiguration.FLINK_VERSION.getValue(options)
+ var flinkDistJarPath = FLINK_DIST_JAR_PATH.getValue(options)
+ if (
+ StringUtils.isNotBlank(flinkVersion) && flinkVersion.equalsIgnoreCase(FLINK_1_12_2_VERSION)
+ ) {
+ flinkDistJarPath = flinkDistJarPath.replaceFirst("flink-dist", "flink-dist_2.11")
+ }
// Local lib path
val providedLibDirsArray = FLINK_LIB_LOCAL_PATH.getValue(options).split(",")
// Ship directories
@@ -126,7 +132,6 @@
)
}
otherParams.put(GovernanceCommonConf.EC_APP_MANAGE_MODE.key, flinkClientType.toLowerCase())
- val flinkVersion = FlinkEnvConfiguration.FLINK_VERSION.getValue(options)
FlinkVersionThreadLocal.setFlinkVersion(flinkVersion)
val context = new EnvironmentContext(
defaultEnv,