[ISSUE-3091][Improve] Improve streampark-flink module based on [3.10 Pre-Conditions Checking] (#3647)
[ISSUE-3091][Improve] Improve streampark-flink module based on [3.10 Pre-Conditions Checking] (#3647)
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index b74ecae..4829c95 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -21,7 +21,7 @@
import org.apache.streampark.common.conf.{FlinkVersion, Workspace}
import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.enums._
-import org.apache.streampark.common.util.{DeflaterUtils, HdfsUtils, PropertiesUtils}
+import org.apache.streampark.common.util.{AssertUtils, DeflaterUtils, HdfsUtils, PropertiesUtils}
import org.apache.streampark.flink.packer.pipeline.{BuildResult, ShadedBuildResponse}
import org.apache.streampark.flink.util.FlinkUtils
import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper
@@ -181,27 +181,25 @@
def checkBuildResult(): Unit = {
executionMode match {
case FlinkExecutionMode.KUBERNETES_NATIVE_SESSION =>
- if (buildResult == null) {
- throw new Exception(
- s"[flink-submit] current job: ${this.effectiveAppName} was not yet built, buildResult is empty" +
- s",clusterId=${k8sSubmitParam.clusterId}," +
- s",namespace=${k8sSubmitParam.kubernetesNamespace}")
- }
- if (!buildResult.pass) {
- throw new Exception(
- s"[flink-submit] current job ${this.effectiveAppName} build failed, clusterId" +
- s",clusterId=${k8sSubmitParam.clusterId}," +
- s",namespace=${k8sSubmitParam.kubernetesNamespace}")
- }
+ AssertUtils.required(
+ buildResult != null,
+ s"[flink-submit] current job: ${this.effectiveAppName} was not yet built, buildResult is empty" +
+ s",clusterId=${k8sSubmitParam.clusterId}," +
+ s",namespace=${k8sSubmitParam.kubernetesNamespace}"
+ )
+ AssertUtils.required(
+ buildResult.pass,
+ s"[flink-submit] current job ${this.effectiveAppName} build failed, clusterId" +
+ s",clusterId=${k8sSubmitParam.clusterId}," +
+ s",namespace=${k8sSubmitParam.kubernetesNamespace}"
+ )
case _ =>
- if (this.buildResult == null) {
- throw new Exception(
- s"[flink-submit] current job: ${this.effectiveAppName} was not yet built, buildResult is empty")
- }
- if (!this.buildResult.pass) {
- throw new Exception(
- s"[flink-submit] current job ${this.effectiveAppName} build failed, please check")
- }
+ AssertUtils.required(
+ this.buildResult != null,
+ s"[flink-submit] current job: ${this.effectiveAppName} was not yet built, buildResult is empty")
+ AssertUtils.required(
+ this.buildResult.pass,
+ s"[flink-submit] current job ${this.effectiveAppName} build failed, please check")
}
}
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index b2d3c84..f043bd1 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -21,7 +21,7 @@
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.FlinkDevelopmentMode
import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{FileUtils, HdfsUtils, Utils}
+import org.apache.streampark.common.util.{AssertUtils, FileUtils, HdfsUtils, Utils}
import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
@@ -59,10 +59,10 @@
logDebug(s"kerberos Security is Enabled...")
val useTicketCache =
flinkDefaultConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE)
- if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
- throw new RuntimeException(
- s"Hadoop security with Kerberos is enabled but the login user $currentUser does not have Kerberos credentials or delegation tokens!")
- }
+ AssertUtils.required(
+ HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache),
+ s"Hadoop security with Kerberos is enabled but the login user $currentUser does not have Kerberos credentials or delegation tokens!"
+ )
}
val providedLibs = {
val array = ListBuffer(
@@ -100,9 +100,7 @@
if (submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK) {
val pyVenv: String = workspace.APP_PYTHON_VENV
- if (!FsOperator.hdfs.exists(pyVenv)) {
- throw new RuntimeException(s"$pyVenv File does not exist")
- }
+ AssertUtils.required(FsOperator.hdfs.exists(pyVenv), s"$pyVenv File does not exist")
val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 9aaa133..d7162fd 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.client.impl
-import org.apache.streampark.common.util.Utils
+import org.apache.streampark.common.util.{AssertUtils, Utils}
import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
@@ -69,10 +69,10 @@
logDebug(s"kerberos Security is Enabled...")
val useTicketCache =
flinkDefaultConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE)
- if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
- throw new RuntimeException(
- s"Hadoop security with Kerberos is enabled but the login user $currentUser does not have Kerberos credentials or delegation tokens!")
- }
+ AssertUtils.required(
+ HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache),
+ s"Hadoop security with Kerberos is enabled but the login user $currentUser does not have Kerberos credentials or delegation tokens!"
+ )
}
val shipFiles = new util.ArrayList[String]()
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
index 63d5e8c..b47a555 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.client.tool
-import org.apache.streampark.common.util.Logger
+import org.apache.streampark.common.util.{AssertUtils, Logger}
import org.apache.streampark.flink.kubernetes.KubernetesRetriever
import org.apache.flink.client.deployment.application.ApplicationConfiguration
@@ -82,10 +82,9 @@
case Failure(_) => null
}
- if (!jarUploadResponse.isSuccessful) {
- throw new Exception(
- s"[flink-submit] upload flink jar to flink session cluster failed, jmRestUrl=$jmRestUrl, response=$jarUploadResponse")
- }
+ AssertUtils.required(
+ jarUploadResponse.isSuccessful,
+ s"[flink-submit] upload flink jar to flink session cluster failed, jmRestUrl=$jmRestUrl, response=$jarUploadResponse")
// refer to https://ci.apache.org/projects/flink/flink-docs-stable/docs/ops/rest_api/#jars-upload
val resp = Request
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 5cf2a9d..a707289 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -266,9 +266,8 @@
submitRequest.developmentMode match {
case FlinkDevelopmentMode.PYFLINK =>
val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
- if (!FsOperator.lfs.exists(pythonVenv)) {
- throw new RuntimeException(s"$pythonVenv File does not exist")
- }
+ AssertUtils.required(FsOperator.lfs.exists(pythonVenv), s"$pythonVenv File does not exist")
+
flinkConfig
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
@@ -573,11 +572,10 @@
}
)
- if (StringUtils.isBlank(configDir)) {
- throw new FlinkException(
- s"[StreamPark] executionMode: ${request.executionMode.getName}, savePoint path is null or invalid.")
- } else configDir
-
+ AssertUtils.required(
+ StringUtils.isNotBlank(configDir),
+ s"[StreamPark] executionMode: ${request.executionMode.getName}, savePoint path is null or invalid.")
+ configDir
}
}
}
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 4e58a50..243ad87 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.client.`trait`
-import org.apache.streampark.common.util.{ExceptionUtils, Utils}
+import org.apache.streampark.common.util.{AssertUtils, ExceptionUtils, Utils}
import org.apache.streampark.common.util.ImplicitsUtils._
import org.apache.streampark.flink.client.bean._
@@ -47,10 +47,10 @@
flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, savepointRequestTrait.clusterId)
val clusterClientFactory = new YarnClusterClientFactory
val applicationId = clusterClientFactory.getClusterId(flinkConf)
- if (applicationId == null) {
- throw new FlinkException(
- "[StreamPark] getClusterClient error. No cluster id was specified. Please specify a cluster to which you would like to connect.")
- }
+ AssertUtils.required(
+ applicationId != null,
+ "[StreamPark] getClusterClient error. No cluster id was specified. Please specify a cluster to which you would like to connect.")
+
val clusterDescriptor = clusterClientFactory.createClusterDescriptor(flinkConf)
clusterDescriptor
.retrieve(applicationId)
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 186d444..1dea1b4 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -20,7 +20,7 @@
import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.{InternalConfigHolder, Workspace}
import org.apache.streampark.common.conf.CommonConfig.{MAVEN_AUTH_PASSWORD, MAVEN_AUTH_USER, MAVEN_REMOTE_URL}
-import org.apache.streampark.common.util.{Logger, Utils}
+import org.apache.streampark.common.util.{AssertUtils, Logger, Utils}
import com.google.common.collect.Lists
import org.apache.maven.plugins.shade.{DefaultShader, ShadeRequest}
@@ -161,9 +161,10 @@
@Nonnull outFatJarPath: String): File = {
val jarLibs = dependencyInfo.extJarLibs
val arts = dependencyInfo.mavenArts
- if (jarLibs.isEmpty && arts.isEmpty) {
- throw new Exception(s"[StreamPark] streampark-packer: empty artifacts.")
- }
+ AssertUtils.required(
+ !(jarLibs.isEmpty && arts.isEmpty),
+ s"[StreamPark] streampark-packer: empty artifacts.")
+
val artFilePaths = resolveArtifacts(arts).map(_.getAbsolutePath)
buildFatJar(mainClass, jarLibs ++ artFilePaths, outFatJarPath)
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index 5d9af1e..d35c484 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.core
import org.apache.streampark.common.conf.ConfigKeys.KEY_FLINK_SQL
-import org.apache.streampark.common.util.Logger
+import org.apache.streampark.common.util.{AssertUtils, Logger}
import org.apache.streampark.flink.core.SqlCommand._
import org.apache.commons.lang3.StringUtils
@@ -125,12 +125,12 @@
logError("StreamPark dose not support 'SELECT' statement now!")
throw new RuntimeException("StreamPark dose not support 'select' statement now!")
case DELETE | UPDATE =>
- if (runMode == "STREAMING") {
- throw new UnsupportedOperationException(
- s"Currently, ${command.toUpperCase()} statement only supports in batch mode, " +
- s"and it requires the target table connector implements the SupportsRowLevelDelete, " +
- s"For more details please refer to: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/$command")
- }
+ AssertUtils.required(
+ runMode != "STREAMING",
+ s"Currently, ${command.toUpperCase()} statement only supports in batch mode, " +
+ s"and it requires the target table connector implements the SupportsRowLevelDelete, " +
+ s"For more details please refer to: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/$command"
+ )
case _ =>
try {
lock.lock()
diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ConfigOption.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ConfigOption.java
index c6e89a6..16b8614 100644
--- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ConfigOption.java
+++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ConfigOption.java
@@ -17,7 +17,7 @@
package org.apache.streampark.gateway;
-import javax.annotation.Nullable;
+import org.apache.streampark.common.util.AssertUtils;
import java.time.Duration;
import java.util.Map;
@@ -82,7 +82,7 @@
* @return The builder for the config option with the given key.
*/
public static OptionBuilder key(String key) {
- checkNotNull(key);
+ AssertUtils.notNull(key);
return new OptionBuilder(key);
}
@@ -196,7 +196,7 @@
*/
@Deprecated
public <T> ConfigOption<T> defaultValue(T value) {
- checkNotNull(value);
+ AssertUtils.notNull(value);
return new ConfigOption<>(key, value, ConfigOption.EMPTY_DESCRIPTION, value.getClass());
}
@@ -246,11 +246,4 @@
return new ConfigOption<>(key, null, ConfigOption.EMPTY_DESCRIPTION, clazz);
}
}
-
- public static <T> T checkNotNull(@Nullable T reference) {
- if (reference == null) {
- throw new NullPointerException();
- }
- return reference;
- }
}
diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java
index 348c738..8153258 100644
--- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java
+++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java
@@ -29,12 +29,6 @@
/** Flink sql gateway's Factory for {@link SqlGatewayService}. */
public class FlinkSqlGatewayServiceFactory implements SqlGatewayServiceFactory {
- public static final ConfigOption<String> BASE_URI =
- ConfigOption.key("base-uri")
- .stringType()
- .noDefaultValue()
- .withDescription("The base uri of the flink cluster.");
-
@Override
public String factoryIdentifier() {
return "flink-v1";
@@ -60,4 +54,10 @@
String baseUri = context.getGateWayServiceOptions().get(BASE_URI.getKey());
return new FlinkSqlGatewayImpl(baseUri);
}
+
+ public static final ConfigOption<String> BASE_URI =
+ ConfigOption.key("base-uri")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The base uri of the flink cluster.");
}