[ISSUE-3091][Improve] Improve streampark-flink module based on [3.10 Pre-Conditions Checking] (#3647)

[ISSUE-3091][Improve] Improve streampark-flink module based on [3.10 Pre-Conditions Checking] (#3647)
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index b74ecae..4829c95 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -21,7 +21,7 @@
 import org.apache.streampark.common.conf.{FlinkVersion, Workspace}
 import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.enums._
-import org.apache.streampark.common.util.{DeflaterUtils, HdfsUtils, PropertiesUtils}
+import org.apache.streampark.common.util.{AssertUtils, DeflaterUtils, HdfsUtils, PropertiesUtils}
 import org.apache.streampark.flink.packer.pipeline.{BuildResult, ShadedBuildResponse}
 import org.apache.streampark.flink.util.FlinkUtils
 import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper
@@ -181,27 +181,25 @@
   def checkBuildResult(): Unit = {
     executionMode match {
       case FlinkExecutionMode.KUBERNETES_NATIVE_SESSION =>
-        if (buildResult == null) {
-          throw new Exception(
-            s"[flink-submit] current job: ${this.effectiveAppName} was not yet built, buildResult is empty" +
-              s",clusterId=${k8sSubmitParam.clusterId}," +
-              s",namespace=${k8sSubmitParam.kubernetesNamespace}")
-        }
-        if (!buildResult.pass) {
-          throw new Exception(
-            s"[flink-submit] current job ${this.effectiveAppName} build failed, clusterId" +
-              s",clusterId=${k8sSubmitParam.clusterId}," +
-              s",namespace=${k8sSubmitParam.kubernetesNamespace}")
-        }
+        AssertUtils.required(
+          buildResult != null,
+          s"[flink-submit] current job: ${this.effectiveAppName} was not yet built, buildResult is empty" +
+            s",clusterId=${k8sSubmitParam.clusterId}," +
+            s",namespace=${k8sSubmitParam.kubernetesNamespace}"
+        )
+        AssertUtils.required(
+          buildResult.pass,
+          s"[flink-submit] current job ${this.effectiveAppName} build failed, clusterId" +
+            s",clusterId=${k8sSubmitParam.clusterId}," +
+            s",namespace=${k8sSubmitParam.kubernetesNamespace}"
+        )
       case _ =>
-        if (this.buildResult == null) {
-          throw new Exception(
-            s"[flink-submit] current job: ${this.effectiveAppName} was not yet built, buildResult is empty")
-        }
-        if (!this.buildResult.pass) {
-          throw new Exception(
-            s"[flink-submit] current job ${this.effectiveAppName} build failed, please check")
-        }
+        AssertUtils.required(
+          this.buildResult != null,
+          s"[flink-submit] current job: ${this.effectiveAppName} was not yet built, buildResult is empty")
+        AssertUtils.required(
+          this.buildResult.pass,
+          s"[flink-submit] current job ${this.effectiveAppName} build failed, please check")
     }
   }
 
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index b2d3c84..f043bd1 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -21,7 +21,7 @@
 import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.enums.FlinkDevelopmentMode
 import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{FileUtils, HdfsUtils, Utils}
+import org.apache.streampark.common.util.{AssertUtils, FileUtils, HdfsUtils, Utils}
 import org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
@@ -59,10 +59,10 @@
       logDebug(s"kerberos Security is Enabled...")
       val useTicketCache =
         flinkDefaultConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE)
-      if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
-        throw new RuntimeException(
-          s"Hadoop security with Kerberos is enabled but the login user $currentUser does not have Kerberos credentials or delegation tokens!")
-      }
+      AssertUtils.required(
+        HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache),
+        s"Hadoop security with Kerberos is enabled but the login user $currentUser does not have Kerberos credentials or delegation tokens!"
+      )
     }
     val providedLibs = {
       val array = ListBuffer(
@@ -100,9 +100,7 @@
 
     if (submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK) {
       val pyVenv: String = workspace.APP_PYTHON_VENV
-      if (!FsOperator.hdfs.exists(pyVenv)) {
-        throw new RuntimeException(s"$pyVenv File does not exist")
-      }
+      AssertUtils.required(FsOperator.hdfs.exists(pyVenv), s"$pyVenv File does not exist")
 
       val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
       if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 9aaa133..d7162fd 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.client.impl
 
-import org.apache.streampark.common.util.Utils
+import org.apache.streampark.common.util.{AssertUtils, Utils}
 import org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 
@@ -69,10 +69,10 @@
       logDebug(s"kerberos Security is Enabled...")
       val useTicketCache =
         flinkDefaultConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE)
-      if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
-        throw new RuntimeException(
-          s"Hadoop security with Kerberos is enabled but the login user $currentUser does not have Kerberos credentials or delegation tokens!")
-      }
+      AssertUtils.required(
+        HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache),
+        s"Hadoop security with Kerberos is enabled but the login user $currentUser does not have Kerberos credentials or delegation tokens!"
+      )
     }
 
     val shipFiles = new util.ArrayList[String]()
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
index 63d5e8c..b47a555 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.client.tool
 
-import org.apache.streampark.common.util.Logger
+import org.apache.streampark.common.util.{AssertUtils, Logger}
 import org.apache.streampark.flink.kubernetes.KubernetesRetriever
 
 import org.apache.flink.client.deployment.application.ApplicationConfiguration
@@ -82,10 +82,9 @@
       case Failure(_) => null
     }
 
-    if (!jarUploadResponse.isSuccessful) {
-      throw new Exception(
-        s"[flink-submit] upload flink jar to flink session cluster failed, jmRestUrl=$jmRestUrl, response=$jarUploadResponse")
-    }
+    AssertUtils.required(
+      jarUploadResponse.isSuccessful,
+      s"[flink-submit] upload flink jar to flink session cluster failed, jmRestUrl=$jmRestUrl, response=$jarUploadResponse")
 
     // refer to https://ci.apache.org/projects/flink/flink-docs-stable/docs/ops/rest_api/#jars-upload
     val resp = Request
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 5cf2a9d..a707289 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -266,9 +266,8 @@
     submitRequest.developmentMode match {
       case FlinkDevelopmentMode.PYFLINK =>
         val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
-        if (!FsOperator.lfs.exists(pythonVenv)) {
-          throw new RuntimeException(s"$pythonVenv File does not exist")
-        }
+        AssertUtils.required(FsOperator.lfs.exists(pythonVenv), s"$pythonVenv File does not exist")
+
         flinkConfig
           // python.archives
           .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
@@ -573,11 +572,10 @@
             }
         )
 
-        if (StringUtils.isBlank(configDir)) {
-          throw new FlinkException(
-            s"[StreamPark] executionMode: ${request.executionMode.getName}, savePoint path is null or invalid.")
-        } else configDir
-
+        AssertUtils.required(
+          StringUtils.isNotBlank(configDir),
+          s"[StreamPark] executionMode: ${request.executionMode.getName}, savePoint path is null or invalid.")
+        configDir
       }
     }
   }
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 4e58a50..243ad87 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.client.`trait`
 
-import org.apache.streampark.common.util.{ExceptionUtils, Utils}
+import org.apache.streampark.common.util.{AssertUtils, ExceptionUtils, Utils}
 import org.apache.streampark.common.util.ImplicitsUtils._
 import org.apache.streampark.flink.client.bean._
 
@@ -47,10 +47,10 @@
     flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, savepointRequestTrait.clusterId)
     val clusterClientFactory = new YarnClusterClientFactory
     val applicationId = clusterClientFactory.getClusterId(flinkConf)
-    if (applicationId == null) {
-      throw new FlinkException(
-        "[StreamPark] getClusterClient error. No cluster id was specified. Please specify a cluster to which you would like to connect.")
-    }
+    AssertUtils.required(
+      applicationId != null,
+      "[StreamPark] getClusterClient error. No cluster id was specified. Please specify a cluster to which you would like to connect.")
+
     val clusterDescriptor = clusterClientFactory.createClusterDescriptor(flinkConf)
     clusterDescriptor
       .retrieve(applicationId)
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 186d444..1dea1b4 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -20,7 +20,7 @@
 import org.apache.streampark.common.Constant
 import org.apache.streampark.common.conf.{InternalConfigHolder, Workspace}
 import org.apache.streampark.common.conf.CommonConfig.{MAVEN_AUTH_PASSWORD, MAVEN_AUTH_USER, MAVEN_REMOTE_URL}
-import org.apache.streampark.common.util.{Logger, Utils}
+import org.apache.streampark.common.util.{AssertUtils, Logger, Utils}
 
 import com.google.common.collect.Lists
 import org.apache.maven.plugins.shade.{DefaultShader, ShadeRequest}
@@ -161,9 +161,10 @@
       @Nonnull outFatJarPath: String): File = {
     val jarLibs = dependencyInfo.extJarLibs
     val arts = dependencyInfo.mavenArts
-    if (jarLibs.isEmpty && arts.isEmpty) {
-      throw new Exception(s"[StreamPark] streampark-packer: empty artifacts.")
-    }
+    AssertUtils.required(
+      !(jarLibs.isEmpty && arts.isEmpty),
+      s"[StreamPark] streampark-packer: empty artifacts.")
+
     val artFilePaths = resolveArtifacts(arts).map(_.getAbsolutePath)
     buildFatJar(mainClass, jarLibs ++ artFilePaths, outFatJarPath)
   }
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index 5d9af1e..d35c484 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -17,7 +17,7 @@
 package org.apache.streampark.flink.core
 
 import org.apache.streampark.common.conf.ConfigKeys.KEY_FLINK_SQL
-import org.apache.streampark.common.util.Logger
+import org.apache.streampark.common.util.{AssertUtils, Logger}
 import org.apache.streampark.flink.core.SqlCommand._
 
 import org.apache.commons.lang3.StringUtils
@@ -125,12 +125,12 @@
               logError("StreamPark dose not support 'SELECT' statement now!")
               throw new RuntimeException("StreamPark dose not support 'select' statement now!")
             case DELETE | UPDATE =>
-              if (runMode == "STREAMING") {
-                throw new UnsupportedOperationException(
-                  s"Currently, ${command.toUpperCase()} statement only supports in batch mode, " +
-                    s"and it requires the target table connector implements the SupportsRowLevelDelete, " +
-                    s"For more details please refer to: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/$command")
-              }
+              AssertUtils.required(
+                runMode != "STREAMING",
+                s"Currently, ${command.toUpperCase()} statement only supports in batch mode, " +
+                  s"and it requires the target table connector implements the SupportsRowLevelDelete, " +
+                  s"For more details please refer to: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/$command"
+              )
             case _ =>
               try {
                 lock.lock()
diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ConfigOption.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ConfigOption.java
index c6e89a6..16b8614 100644
--- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ConfigOption.java
+++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ConfigOption.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.gateway;
 
-import javax.annotation.Nullable;
+import org.apache.streampark.common.util.AssertUtils;
 
 import java.time.Duration;
 import java.util.Map;
@@ -82,7 +82,7 @@
    * @return The builder for the config option with the given key.
    */
   public static OptionBuilder key(String key) {
-    checkNotNull(key);
+    AssertUtils.notNull(key);
     return new OptionBuilder(key);
   }
 
@@ -196,7 +196,7 @@
      */
     @Deprecated
     public <T> ConfigOption<T> defaultValue(T value) {
-      checkNotNull(value);
+      AssertUtils.notNull(value);
       return new ConfigOption<>(key, value, ConfigOption.EMPTY_DESCRIPTION, value.getClass());
     }
 
@@ -246,11 +246,4 @@
       return new ConfigOption<>(key, null, ConfigOption.EMPTY_DESCRIPTION, clazz);
     }
   }
-
-  public static <T> T checkNotNull(@Nullable T reference) {
-    if (reference == null) {
-      throw new NullPointerException();
-    }
-    return reference;
-  }
 }
diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java
index 348c738..8153258 100644
--- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java
+++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java
@@ -29,12 +29,6 @@
 /** Flink sql gateway's Factory for {@link SqlGatewayService}. */
 public class FlinkSqlGatewayServiceFactory implements SqlGatewayServiceFactory {
 
-  public static final ConfigOption<String> BASE_URI =
-      ConfigOption.key("base-uri")
-          .stringType()
-          .noDefaultValue()
-          .withDescription("The base uri of the flink cluster.");
-
   @Override
   public String factoryIdentifier() {
     return "flink-v1";
@@ -60,4 +54,10 @@
     String baseUri = context.getGateWayServiceOptions().get(BASE_URI.getKey());
     return new FlinkSqlGatewayImpl(baseUri);
   }
+
+  public static final ConfigOption<String> BASE_URI =
+      ConfigOption.key("base-uri")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("The base uri of the flink cluster.");
 }