[Improve] AssertUtils add and check not null improvements (#3625)

* [Improve] AssertUtils add and check not null improvements

* [Improve] code style improvements

---------

Co-authored-by: benjobs <benjobx@gmail.com>
diff --git a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
deleted file mode 100644
index 22d93e5..0000000
--- a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.common.util;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-
-/** Utils to process exception message. */
-public class ExceptionUtils {
-
-  private ExceptionUtils() {}
-
-  /**
-   * Stringify the exception object.
-   *
-   * @param throwable the target exception to stringify.
-   * @return the result of string-exception.
-   */
-  @Nonnull
-  public static String stringifyException(@Nullable Throwable throwable) {
-    if (throwable == null) {
-      return "(null)";
-    }
-    try (StringWriter stm = new StringWriter();
-        PrintWriter writer = new PrintWriter(stm)) {
-      throwable.printStackTrace(writer);
-      return stm.toString();
-    } catch (IOException e) {
-      return e.getClass().getName() + " (error while printing stack trace)";
-    }
-  }
-
-  @FunctionalInterface
-  public interface WrapperRuntimeExceptionHandler<I, O> {
-    O handle(I input) throws Exception;
-  }
-
-  public static <I, O> O wrapRuntimeException(
-      I input, WrapperRuntimeExceptionHandler<I, O> handler) {
-    try {
-      return handler.handle(input);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-}
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala b/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala
index 8e42635..1fce430 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.common.fs
 
 import org.apache.streampark.common.util.Logger
-import org.apache.streampark.common.util.Utils.{isAnyBank, requireNotEmpty}
+import org.apache.streampark.common.util.Utils.{isAnyBank, isNotEmpty}
 
 import org.apache.commons.codec.digest.DigestUtils
 import org.apache.commons.io.{FileUtils, IOUtils}
@@ -41,7 +41,7 @@
   }
 
   override def delete(path: String): Unit = {
-    if (requireNotEmpty(path)) {
+    if (isNotEmpty(path)) {
       val file = new File(path)
       if (file.exists()) {
         FileUtils.forceDelete(file)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala
new file mode 100644
index 0000000..dfcc9d0
--- /dev/null
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.common.util
+
+import org.apache.streampark.common.util.Utils.isEmpty
+
+import javax.annotation.Nullable
+
+import java.util
+
+import scala.collection.convert.ImplicitConversions._
+
+/** @since 2.2.0 */
+object AssertUtils {
+
+  /**
+   * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if the
+   * condition is not met (evaluates to {@code false}).
+   *
+   * @param condition
+   *   The condition to check
+   * @throws IllegalArgumentException
+   *   Thrown, if the condition is violated.
+   */
+  def required(condition: Boolean): Unit = {
+    if (!condition) {
+      throw new IllegalArgumentException
+    }
+  }
+
+  /**
+   * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if the
+   * condition is not met (evaluates to {@code false}). The exception will have the given error
+   * message.
+   *
+   * @param condition
+   *   The condition to check
+   * @param message
+   *   The message for the {@code IllegalArgumentException} that is thrown if the check fails.
+   * @throws IllegalArgumentException
+   *   Thrown, if the condition is violated.
+   */
+  def required(condition: Boolean, @Nullable message: String): Unit = {
+    if (!condition) {
+      throw new IllegalArgumentException(message)
+    }
+  }
+
+  /**
+   * Checks the given boolean condition, and throws an {@code IllegalStateException} if the
+   * condition is not met (evaluates to {@code false}).
+   *
+   * @param condition
+   *   The condition to check
+   * @throws IllegalStateException
+   *   Thrown, if the condition is violated.
+   */
+  def state(condition: Boolean): Unit = {
+    if (!condition) {
+      throw new IllegalStateException
+    }
+  }
+
+  /**
+   * Checks the given boolean condition, and throws an IllegalStateException if the condition is not
+   * met (evaluates to {@code false}). The exception will have the given error message.
+   *
+   * @param condition
+   *   The condition to check
+   * @param message
+   *   The message for the IllegalStateException that is thrown if the check fails.
+   * @throws IllegalStateException
+   *   Thrown, if the condition is violated.
+   */
+  def state(condition: Boolean, @Nullable message: String): Unit = {
+    if (!condition) {
+      throw new IllegalStateException(message)
+    }
+  }
+
+  // ------------------------------------------------------------------------
+  //  Null checks
+  // ------------------------------------------------------------------------
+  /** Ensures that the given object reference is not null. Upon violation, a */
+  def notNull[T](@Nullable reference: T): T = {
+    if (reference == null) {
+      throw new NullPointerException
+    }
+    reference
+  }
+
+  /**
+   * Ensures that the given object reference is not null. Upon violation, a NullPointerException
+   * that is thrown if the check fails.
+   *
+   * @return
+   *   The object reference itself (generically typed).
+   * @throws NullPointerException
+   *   Thrown, if the passed reference was null.
+   */
+  def notNull[T](@Nullable reference: T, @Nullable message: String): T = {
+    if (reference == null) {
+      throw new NullPointerException(message)
+    }
+    reference
+  }
+
+  /**
+   * Assert that an Array|CharSequence|JavaCollection|Map|Iterable... must not be {@code null} and
+   * must contain at least one element. <pre class="code">AssertUtils.notEmpty(array, "must be
+   * contain elements");</pre>
+   *
+   * @param reference
+   *   the object to check
+   * @throws IllegalArgumentException
+   *   if the object array is {@code null} or contains no elements
+   */
+  def notEmpty(reference: AnyRef): Unit = {
+    if (Utils.isEmpty(reference)) {
+      throw new IllegalArgumentException()
+    }
+  }
+
+  /**
+   * Assert that an Array|CharSequence|JavaCollection|Map|Iterable... must not be {@code null} and
+   * must contain at least one element. <pre class="code"> AssertUtils.notEmpty(array, "must be
+   * contain elements");</pre>
+   *
+   * @param reference
+   *   the object to check
+   * @param message
+   *   the exception message to use if the assertion fails
+   * @throws IllegalArgumentException
+   *   if the object array is {@code null} or contains no elements
+   */
+  def notEmpty(@Nullable reference: AnyRef, message: String): Unit = {
+    if (isEmpty(reference)) {
+      throw new IllegalArgumentException(message)
+    }
+  }
+
+  /**
+   * Assert that an array contains no {@code null} elements. <p>Note: Does not complain if the array
+   * is empty! <pre class="code">AssertUtils.noNullElements(array, "The array must contain non-null
+   * elements");</pre>
+   *
+   * @param array
+   *   the array to check
+   * @param message
+   *   the exception message to use if the assertion fails
+   * @throws IllegalArgumentException
+   *   if the object array contains a {@code null} element
+   */
+  def noNullElements(@Nullable array: Array[AnyRef], message: String): Unit = {
+    if (array != null) for (element <- array) {
+      if (element == null) throw new IllegalArgumentException(message)
+    }
+  }
+
+  /**
+   * Assert that a collection contains no {@code null} elements. <p>Note: Does not complain if the
+   * collection is empty! <pre class="code">AssertUtils.noNullElements(collection, "Collection must
+   * contain non-null elements");</pre>
+   *
+   * @param collection
+   *   the collection to check
+   * @param message
+   *   the exception message to use if the assertion fails
+   * @throws IllegalArgumentException
+   *   if the collection contains a {@code null} element
+   */
+  def noNullElements(@Nullable collection: util.Collection[_], message: String): Unit = {
+    if (collection != null) for (element <- collection) {
+      if (element == null) {
+        throw new IllegalArgumentException(message)
+      }
+    }
+  }
+
+  /**
+   * Assert that the given String is not empty; that is, it must not be {@code null} and not the
+   * empty String. <pre class="code">AssertUtils.hasLength(name, "Name must not be empty");</pre>
+   *
+   * @param text
+   *   the String to check
+   * @throws IllegalArgumentException
+   *   if the text is empty
+   * @see
+   *   StringUtils#hasLength
+   */
+  def hasLength(@Nullable text: String): Unit = {
+    if (!getHasLength(text)) {
+      throw new IllegalArgumentException()
+    }
+  }
+
+  /**
+   * Assert that the given String is not empty; that is, it must not be {@code null} and not the
+   * empty String. <pre class="code">AssertUtils.hasLength(name, "Name must not be empty");</pre>
+   *
+   * @param text
+   *   the String to check
+   * @param message
+   *   the exception message to use if the assertion fails
+   * @throws IllegalArgumentException
+   *   if the text is empty
+   * @see
+   *   StringUtils#hasLength
+   */
+  def hasLength(@Nullable text: String, message: String): Unit = {
+    if (!getHasLength(text)) {
+      throw new IllegalArgumentException(message)
+    }
+  }
+
+  /**
+   * Assert that the given String contains valid text content; that is, it must not be {@code null}
+   * and must contain at least one non-whitespace character. <pre
+   * class="code">AssertUtils.hasText(name, "'name' must not be empty");</pre>
+   *
+   * @param text
+   *   the String to check
+   * @throws IllegalArgumentException
+   *   if the text does not contain valid text content
+   * @see
+   *   StringUtils#hasText
+   */
+  def hasText(@Nullable text: String): Unit = {
+    if (!getHasText(text)) {
+      throw new IllegalArgumentException()
+    }
+  }
+
+  /**
+   * Assert that the given String contains valid text content; that is, it must not be {@code null}
+   * and must contain at least one non-whitespace character. <pre
+   * class="code">AssertUtils.hasText(name, "'name' must not be empty");</pre>
+   *
+   * @param text
+   *   the String to check
+   * @param message
+   *   the exception message to use if the assertion fails
+   * @throws IllegalArgumentException
+   *   if the text does not contain valid text content
+   * @see
+   *   StringUtils#hasText
+   */
+  def hasText(@Nullable text: String, message: String): Unit = {
+    if (!getHasText(text)) {
+      throw new IllegalArgumentException(message)
+    }
+  }
+
+  private[this] def getHasLength(@Nullable str: String): Boolean = str != null && str.nonEmpty
+
+  private[this] def getHasText(@Nullable str: String): Boolean = {
+    str != null && str.nonEmpty && containsText(str)
+  }
+
+  private[this] def containsText(str: CharSequence): Boolean = {
+    val strLen = str.length
+    for (i <- 0 until strLen) {
+      if (!Character.isWhitespace(str.charAt(i))) {
+        return true
+      }
+    }
+    false
+  }
+
+}
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/ExceptionUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ExceptionUtils.scala
new file mode 100644
index 0000000..a382272
--- /dev/null
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ExceptionUtils.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util
+
+import javax.annotation.Nonnull
+import javax.annotation.Nullable
+
+import java.io.PrintWriter
+import java.io.StringWriter
+
+object ExceptionUtils {
+
+  /**
+   * Stringify the exception object.
+   *
+   * @param throwable
+   *   the target exception to stringify.
+   * @return
+   *   the result of string-exception.
+   */
+  @Nonnull def stringifyException(@Nullable throwable: Throwable): String = {
+    if (throwable == null) {
+      return "(null)"
+    }
+    val stm = new StringWriter()
+    val writer = new PrintWriter(stm)
+    try {
+      throwable.printStackTrace(writer);
+      stm.toString
+    } catch {
+      case e: Exception => s"${e.getClass.getName} (error while printing stack trace)";
+      case _ => null
+    } finally {
+      Utils.close(writer, stm)
+    }
+  }
+
+  @FunctionalInterface trait WrapperRuntimeExceptionHandler[I, O] {
+    @throws[Exception]
+    def handle(input: I): O
+  }
+
+  def wrapRuntimeException[I, O](input: I, handler: WrapperRuntimeExceptionHandler[I, O]): O = {
+    try handler.handle(input)
+    catch {
+      case e: Exception =>
+        throw new RuntimeException(e)
+    }
+  }
+}
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 57cf5e5..e8fe7ef 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -92,9 +92,10 @@
 
   def getPathFromEnv(env: String): String = {
     val path = Option(System.getenv(env)).getOrElse(System.getProperty(env))
-    require(
-      Utils.requireNotEmpty(path),
-      s"[StreamPark] FileUtils.getPathFromEnv: $env is not set on system env")
+    AssertUtils.notNull(
+      path,
+      s"[StreamPark] FileUtils.getPathFromEnv: $env is not set on system env"
+    )
     val file = new File(path)
     require(file.exists(), s"[StreamPark] FileUtils.getPathFromEnv: $env is not exist!")
     file.getAbsolutePath
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
index faf1fc9..ac097b1 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
@@ -134,7 +134,7 @@
     val shadedPackage = "org.apache.streampark.shaded"
 
     override def configureByResource(url: URL): Unit = {
-      Utils.requireNotNull(url, "URL argument cannot be null")
+      AssertUtils.notNull(url, "URL argument cannot be null")
       val path = url.getPath
       if (path.endsWith("xml")) {
         val configurator = new JoranConfigurator()
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 541f57f..e3a0ae5 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -281,7 +281,7 @@
       val map = mutable.Map[String, String]()
       val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "")
       simple.split("\\s?-D") match {
-        case d if Utils.requireNotEmpty(d) =>
+        case d if Utils.isNotEmpty(d) =>
           d.foreach(
             x => {
               if (x.nonEmpty) {
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index fcba212..ed27537 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -33,17 +33,7 @@
 
   private[this] lazy val OS = System.getProperty("os.name").toLowerCase
 
-  def requireNotNull(obj: Any, message: String): Unit = {
-    if (obj == null) {
-      throw new NullPointerException(message)
-    }
-  }
-
-  def requireNotNull(obj: Any): Unit = {
-    requireNotNull(obj, "this argument must not be null")
-  }
-
-  def requireNotEmpty(elem: Any): Boolean = {
+  def isNotEmpty(elem: Any): Boolean = {
     elem match {
       case null => false
       case x if x.isInstanceOf[Array[_]] => elem.asInstanceOf[Array[_]].nonEmpty
@@ -56,19 +46,7 @@
     }
   }
 
-  def isEmpty(elem: Any): Boolean = !requireNotEmpty(elem)
-
-  def required(expression: Boolean): Unit = {
-    if (!expression) {
-      throw new IllegalArgumentException
-    }
-  }
-
-  def required(expression: Boolean, errorMessage: Any): Unit = {
-    if (!expression) {
-      throw new IllegalArgumentException(s"Requirement failed: ${errorMessage.toString}")
-    }
-  }
+  def isEmpty(elem: Any): Boolean = !isNotEmpty(elem)
 
   def uuid(): String = UUID.randomUUID().toString.replaceAll("-", "")
 
diff --git a/streampark-common/src/test/scala/org/apache/streampark/common/util/UtilsTest.scala b/streampark-common/src/test/scala/org/apache/streampark/common/util/UtilsTest.scala
index afc5f04..5f7a5c8 100644
--- a/streampark-common/src/test/scala/org/apache/streampark/common/util/UtilsTest.scala
+++ b/streampark-common/src/test/scala/org/apache/streampark/common/util/UtilsTest.scala
@@ -30,31 +30,31 @@
 
   test("requiredNotNull should throw NullPointerException if argument is null") {
     val nullPointerException = intercept[NullPointerException] {
-      Utils.requireNotNull(null, "object can't be null")
+      AssertUtils.notNull(null, "object can't be null")
     }
     assert(nullPointerException.getMessage == "object can't be null")
   }
 
   test("requireNotEmpty should check if argument is not empty") {
-    assert(!Utils.requireNotEmpty(null))
-    assert(Utils.requireNotEmpty(Array(1)))
-    assert(Utils.requireNotEmpty("string"))
-    assert(Utils.requireNotEmpty(Seq("Seq")))
-    assert(Utils.requireNotEmpty(Iterable("Iterable")))
+    assert(!Utils.isNotEmpty(null))
+    assert(Utils.isNotEmpty(Array(1)))
+    assert(Utils.isNotEmpty("string"))
+    assert(Utils.isNotEmpty(Seq("Seq")))
+    assert(Utils.isNotEmpty(Iterable("Iterable")))
 
     val arrayList = new util.ArrayList[String](16)
     arrayList.add("arrayList")
-    assert(Utils.requireNotEmpty(arrayList))
+    assert(Utils.isNotEmpty(arrayList))
 
     val hashMap = new util.HashMap[String, String](16)
     hashMap.put("hash", "map")
-    assert(Utils.requireNotEmpty(hashMap))
-    assert(Utils.requireNotEmpty())
+    assert(Utils.isNotEmpty(hashMap))
+    assert(Utils.isNotEmpty())
   }
 
   test("required should throw IllegalArgumentException if condition is false") {
     val illegalArgumentException = intercept[IllegalArgumentException] {
-      Utils.required(false)
+      AssertUtils.required(false)
     }
     assert(illegalArgumentException.getMessage == null)
   }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java
index d9eb147..e93737d 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java
@@ -43,7 +43,7 @@
 
   @SneakyThrows
   public static Dependency toDependency(String dependency) {
-    if (Utils.requireNotEmpty(dependency)) {
+    if (Utils.isNotEmpty(dependency)) {
       return JacksonUtils.read(dependency, new TypeReference<Dependency>() {});
     }
     return new Dependency();
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
index f3e713e..ab45f2d 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.component;
 
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.core.bean.AlertTemplate;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.SavePoint;
@@ -28,8 +29,6 @@
 import org.apache.streampark.console.core.service.application.ApplicationActionService;
 import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
 
-import org.apache.flink.shaded.guava30.com.google.common.base.Preconditions;
-
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import lombok.Data;
@@ -120,10 +119,9 @@
     checkPointFailedCache.remove(appId);
     FailoverStrategyEnum failoverStrategyEnum =
         FailoverStrategyEnum.of(application.getCpFailureAction());
-    Preconditions.checkArgument(
+    AssertUtils.required(
         failoverStrategyEnum != null,
-        "Unexpected cpFailureAction: %s",
-        application.getCpFailureAction());
+        "Unexpected cpFailureAction: " + application.getCpFailureAction());
     processFailoverStrategy(application, failoverStrategyEnum);
   }
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
index 5af4fc0..22076c5 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.controller;
 
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.core.entity.ExternalLink;
 import org.apache.streampark.console.core.service.ExternalLinkService;
@@ -82,7 +82,7 @@
   @PostMapping("/update")
   @RequiresPermissions("externalLink:update")
   public RestResponse update(@Valid ExternalLink externalLink) {
-    Utils.requireNotNull(externalLink.getId(), "The link id cannot be null");
+    AssertUtils.notNull(externalLink.getId(), "The link id cannot be null");
     externalLinkService.update(externalLink);
     return RestResponse.success();
   }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
index 1005cb3..9f5bd6c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
@@ -20,6 +20,7 @@
 import org.apache.streampark.common.conf.CommonConfig;
 import org.apache.streampark.common.conf.InternalConfigHolder;
 import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.exception.ApiDetailException;
 import org.apache.streampark.console.base.util.GitUtils;
@@ -28,7 +29,6 @@
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.shaded.guava30.com.google.common.base.Preconditions;
 
 import com.baomidou.mybatisplus.annotation.FieldStrategy;
 import com.baomidou.mybatisplus.annotation.IdType;
@@ -40,8 +40,6 @@
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.jgit.lib.Constants;
 
-import javax.annotation.Nonnull;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
@@ -188,6 +186,33 @@
 
   @JsonIgnore
   public String getMavenArgs() {
+    // 1) check build args
+    String buildArg = getMvnBuildArgs();
+    StringBuilder argBuilder = new StringBuilder();
+    if (StringUtils.isNotBlank(buildArg)) {
+      argBuilder.append(buildArg);
+    }
+
+    // 2) mvn setting file
+    String mvnSetting = getMvnSetting();
+    if (StringUtils.isNotBlank(mvnSetting)) {
+      argBuilder.append(" --settings ").append(mvnSetting);
+    }
+
+    // 3) check args
+    String cmd = argBuilder.toString();
+    String illegalArg = getIllegalArgs(cmd);
+    if (illegalArg != null) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Invalid maven argument, illegal args: %s, in your maven args: %s", illegalArg, cmd));
+    }
+
+    String mvn = getMvn();
+    return mvn.concat(" ").concat(cmd);
+  }
+
+  private String getMvn() {
     boolean windows = Utils.isWindows();
     String mvn = windows ? "mvn.cmd" : "mvn";
 
@@ -200,7 +225,7 @@
       try {
         Process process = Runtime.getRuntime().exec(mvn + " --version");
         process.waitFor();
-        Utils.required(process.exitValue() == 0);
+        AssertUtils.required(process.exitValue() == 0);
         useWrapper = false;
       } catch (Exception ignored) {
         log.warn("try using user-installed maven failed, now use maven-wrapper.");
@@ -210,44 +235,32 @@
     if (useWrapper) {
       mvn = WebUtils.getAppHome().concat(windows ? "/bin/mvnw.cmd" : "/bin/mvnw");
     }
-
-    return renderCmd(mvn);
+    return mvn;
   }
 
-  @Nonnull
-  private String renderCmd(String mvn) {
-    StringBuilder cmdBuffer = new StringBuilder(mvn).append(" clean package -DskipTests ");
-    renderCmdByBuildArgs(cmdBuffer);
-    renderCmdBySetting(cmdBuffer);
-    return cmdBuffer.toString();
-  }
-
-  private void renderCmdByBuildArgs(StringBuilder cmdBuffer) {
+  private String getMvnBuildArgs() {
     if (StringUtils.isNotBlank(this.buildArgs)) {
       String args = getIllegalArgs(this.buildArgs);
-      Preconditions.checkArgument(
+      AssertUtils.required(
           args == null,
-          "Illegal argument: \"%s\" in maven build parameters: %s",
-          args,
-          this.buildArgs);
-      cmdBuffer.append(this.buildArgs.trim());
+          String.format(
+              "Illegal argument: \"%s\" in maven build parameters: %s", args, this.buildArgs));
+      return this.buildArgs.trim();
     }
+    return null;
   }
 
-  private void renderCmdBySetting(StringBuilder cmdBuffer) {
+  private String getMvnSetting() {
     String setting = InternalConfigHolder.get(CommonConfig.MAVEN_SETTINGS_PATH());
     if (StringUtils.isBlank(setting)) {
-      return;
+      return null;
     }
-    String args = getIllegalArgs(setting);
-    Preconditions.checkArgument(
-        args == null, "Illegal argument \"%s\" in maven-setting file path: %s", args, setting);
     File file = new File(setting);
-    Preconditions.checkArgument(
+    AssertUtils.required(
         !file.exists() || !file.isFile(),
-        "Invalid maven-setting file path \"%s\", the path not exist or is not file",
-        setting);
-    cmdBuffer.append(" --settings ").append(setting);
+        String.format(
+            "Invalid maven-setting file path \"%s\", the path not exist or is not file", setting));
+    return setting;
   }
 
   private String getIllegalArgs(String param) {
@@ -257,7 +270,7 @@
       return matcher.group(1) == null ? matcher.group(2) : matcher.group(1);
     }
 
-    Iterator<String> iterator = Arrays.asList(";", "|", "&", ">").iterator();
+    Iterator<String> iterator = Arrays.asList(";", "|", "&", ">", "<").iterator();
     String[] argsList = param.split("\\s+");
     while (iterator.hasNext()) {
       String chr = iterator.next();
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 635c2a4..3454dd8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -24,8 +24,8 @@
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.SystemPropertyUtils;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.common.zio.ZIOExt;
 import org.apache.streampark.console.base.util.WebUtils;
 import org.apache.streampark.console.core.entity.FlinkEnv;
@@ -101,7 +101,7 @@
         .forEach(
             key -> {
               InternalOption config = InternalConfigHolder.getConfig(key);
-              Utils.requireNotNull(config);
+              AssertUtils.notNull(config);
               InternalConfigHolder.set(config, env.getProperty(key, config.classType()));
             });
 
@@ -181,7 +181,7 @@
 
   private void uploadClientJar(Workspace workspace, FsOperator fsOperator) {
     File client = WebUtils.getAppClientDir();
-    Utils.required(
+    AssertUtils.required(
         client.exists() && client.listFiles().length > 0,
         client.getAbsolutePath().concat(" is not exists or empty directory "));
 
@@ -198,7 +198,7 @@
     File[] shims =
         WebUtils.getAppLibDir()
             .listFiles(pathname -> pathname.getName().matches(PATTERN_FLINK_SHIMS_JAR.pattern()));
-    Utils.required(shims != null && shims.length > 0, "streampark-flink-shims jar not exist");
+    AssertUtils.required(shims != null && shims.length > 0, "streampark-flink-shims jar not exist");
 
     String appShims = workspace.APP_SHIMS();
     fsOperator.delete(appShims);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 2974faa..60047f4 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -27,11 +27,11 @@
 import org.apache.streampark.common.enums.FlinkRestoreMode;
 import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
 import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApplicationException;
 import org.apache.streampark.console.core.entity.AppBuildPipeline;
@@ -381,7 +381,7 @@
   public void start(Application appParam, boolean auto) throws Exception {
     // 1) check application
     final Application application = getById(appParam.getId());
-    Utils.requireNotNull(application);
+    AssertUtils.notNull(application);
     ApiAlertException.throwIfTrue(
         !application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly.");
 
@@ -397,7 +397,7 @@
     }
 
     AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());
-    Utils.requireNotNull(buildPipeline);
+    AssertUtils.notNull(buildPipeline);
 
     FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
     ApiAlertException.throwIfNull(flinkEnv, "[StreamPark] can no found flink version");
@@ -635,7 +635,7 @@
     switch (application.getDevelopmentMode()) {
       case FLINK_SQL:
         FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
-        Utils.requireNotNull(flinkSql);
+        AssertUtils.notNull(flinkSql);
         // 1) dist_userJar
         String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
         // 2) appConfig
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index ffcafc2..3370e66 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -24,9 +24,9 @@
 import org.apache.streampark.common.enums.FlinkDevelopmentMode;
 import org.apache.streampark.common.enums.FlinkExecutionMode;
 import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
 import org.apache.streampark.common.util.FileUtils;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.util.JacksonUtils;
 import org.apache.streampark.console.base.util.WebUtils;
@@ -194,7 +194,7 @@
     FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false);
     if (app.isFlinkSqlJobOrPyFlinkJob()) {
       FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql;
-      Utils.requireNotNull(flinkSql);
+      AssertUtils.notNull(flinkSql);
       app.setDependency(flinkSql.getDependency());
       app.setTeamResource(flinkSql.getTeamResource());
     }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
index 1a88494..ebf979c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.ExternalLink;
 import org.apache.streampark.console.core.enums.PlaceholderTypeEnum;
@@ -77,7 +77,7 @@
   @Override
   public List<ExternalLink> render(Long appId) {
     Application app = applicationManageService.getById(appId);
-    Utils.requireNotNull(app, "Application doesn't exist");
+    AssertUtils.notNull(app, "Application doesn't exist");
     List<ExternalLink> externalLink = this.list();
     if (externalLink != null && externalLink.size() > 0) {
       // Render the placeholder
@@ -112,10 +112,10 @@
     if (result == null) {
       return true;
     }
-    Utils.required(
+    AssertUtils.required(
         !result.getBadgeName().equals(params.getBadgeName()),
         String.format("The name: %s is already existing.", result.getBadgeName()));
-    Utils.required(
+    AssertUtils.required(
         !result.getLinkUrl().equals(params.getLinkUrl()),
         String.format("The linkUrl: %s is already existing.", result.getLinkUrl()));
     return false;
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index d648a22..3599ed8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -17,9 +17,9 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
 import org.apache.streampark.console.core.entity.Application;
@@ -172,11 +172,11 @@
   @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
   public void rollback(Application application) {
     FlinkSql sql = getCandidate(application.getId(), CandidateTypeEnum.HISTORY);
-    Utils.requireNotNull(sql);
+    AssertUtils.notNull(sql);
     try {
       // check and backup current job
       FlinkSql effectiveSql = getEffective(application.getId(), false);
-      Utils.requireNotNull(effectiveSql);
+      AssertUtils.notNull(effectiveSql);
       // rollback history sql
       backUpService.rollbackFlinkSql(application, sql);
     } catch (Exception e) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index cdb1859..778b1a0 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -21,9 +21,9 @@
 import org.apache.streampark.common.conf.CommonConfig;
 import org.apache.streampark.common.conf.InternalConfigHolder;
 import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.FileUtils;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.ResponseCode;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.domain.RestResponse;
@@ -109,7 +109,7 @@
   @Override
   public boolean update(Project projectParam) {
     Project project = getById(projectParam.getId());
-    Utils.requireNotNull(project);
+    AssertUtils.notNull(project);
     ApiAlertException.throwIfFalse(
         project.getTeamId().equals(projectParam.getTeamId()),
         "TeamId can't be changed, update project failed.");
@@ -155,7 +155,7 @@
   @Override
   public boolean removeById(Long id) {
     Project project = getById(id);
-    Utils.requireNotNull(project);
+    AssertUtils.notNull(project);
     LambdaQueryWrapper<Application> queryWrapper =
         new LambdaQueryWrapper<Application>().eq(Application::getProjectId, id);
     long count = applicationManageService.count(queryWrapper);
@@ -227,7 +227,7 @@
   @Override
   public List<String> listModules(Long id) {
     Project project = getById(id);
-    Utils.requireNotNull(project);
+    AssertUtils.notNull(project);
 
     if (BuildStateEnum.SUCCESSFUL != BuildStateEnum.of(project.getBuildState())
         || !project.getDistHome().exists()) {
@@ -293,7 +293,7 @@
       }
       List<Map<String, Object>> confList = new ArrayList<>();
       File[] files = unzipFile.listFiles(x -> "conf".equals(x.getName()));
-      Utils.requireNotNull(files);
+      AssertUtils.notNull(files);
       for (File item : files) {
         eachFile(item, confList, true);
       }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 8616e5a..c973be1 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -18,9 +18,9 @@
 package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.enums.FlinkExecutionMode;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.InternalException;
@@ -288,7 +288,7 @@
     Map<String, Object> properties = new HashMap<>();
 
     if (FlinkExecutionMode.isRemoteMode(application.getFlinkExecutionMode())) {
-      Utils.requireNotNull(
+      AssertUtils.notNull(
           cluster,
           String.format(
               "The clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
@@ -306,7 +306,7 @@
     }
     if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
       if (FlinkExecutionMode.YARN_SESSION == application.getFlinkExecutionMode()) {
-        Utils.requireNotNull(
+        AssertUtils.notNull(
             cluster,
             String.format(
                 "The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
@@ -374,7 +374,7 @@
 
     // At the remote mode, request the flink webui interface to get the savepoint path
     FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
-    Utils.requireNotNull(
+    AssertUtils.notNull(
         cluster,
         String.format(
             "The clusterId=%s cannot be find, maybe the clusterId is wrong or "
@@ -439,8 +439,8 @@
   private void expire(SavePoint entity) {
     FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
     Application application = applicationManageService.getById(entity.getAppId());
-    Utils.requireNotNull(flinkEnv);
-    Utils.requireNotNull(application);
+    AssertUtils.notNull(flinkEnv);
+    AssertUtils.notNull(application);
 
     int cpThreshold =
         tryGetChkNumRetainedFromDynamicProps(application.getDynamicProperties())
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
index 0b70f9d..d2ba899 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.core.service.SqlCompleteService;
 
 import com.google.common.collect.Sets;
@@ -56,7 +56,7 @@
 
   @Override
   public List<String> getComplete(String sql) {
-    if (sql.length() > 0 && BLACK_SET.contains(sql.charAt(sql.length() - 1))) {
+    if (!sql.isEmpty() && BLACK_SET.contains(sql.charAt(sql.length() - 1))) {
       return new ArrayList<>();
     }
     String[] temp = sql.split("\\s");
@@ -184,7 +184,7 @@
         nowStep = nowStep.get(nowChar).getNext();
         loc += 1;
       }
-      Utils.requireNotNull(preNode);
+      AssertUtils.notNull(preNode);
       preNode.setStop();
       preNode.setCount(count);
     }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
index 9860654..d52bdc9 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
@@ -18,7 +18,7 @@
 package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.enums.FlinkExecutionMode;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
@@ -73,8 +73,8 @@
 
   @Override
   public IPage<YarnQueue> getPage(YarnQueue yarnQueue, RestRequest request) {
-    Utils.requireNotNull(yarnQueue, "Yarn queue query params mustn't be null.");
-    Utils.requireNotNull(
+    AssertUtils.notNull(yarnQueue, "Yarn queue query params mustn't be null.");
+    AssertUtils.notNull(
         yarnQueue.getTeamId(), "Team id of yarn queue query params mustn't be null.");
     Page<YarnQueue> page = MybatisPager.getPage(request);
     return this.baseMapper.selectPage(page, yarnQueue);
@@ -88,8 +88,8 @@
   @Override
   public ResponseResult<String> checkYarnQueue(YarnQueue yarnQueue) {
 
-    Utils.requireNotNull(yarnQueue, "Yarn queue mustn't be empty.");
-    Utils.requireNotNull(yarnQueue.getTeamId(), "Team id mustn't be null.");
+    AssertUtils.notNull(yarnQueue, "Yarn queue mustn't be empty.");
+    AssertUtils.notNull(yarnQueue.getTeamId(), "Team id mustn't be null.");
 
     ResponseResult<String> responseResult = new ResponseResult<>();
 
@@ -206,8 +206,8 @@
 
   @VisibleForTesting
   public YarnQueue getYarnQueueByIdWithPreconditions(YarnQueue yarnQueue) {
-    Utils.requireNotNull(yarnQueue, "Yarn queue mustn't be null.");
-    Utils.requireNotNull(yarnQueue.getId(), "Yarn queue id mustn't be null.");
+    AssertUtils.notNull(yarnQueue, "Yarn queue mustn't be null.");
+    AssertUtils.notNull(yarnQueue.getId(), "Yarn queue id mustn't be null.");
     YarnQueue queueFromDB = getById(yarnQueue.getId());
     ApiAlertException.throwIfNull(queueFromDB, "The queue doesn't exist.");
     return queueFromDB;
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
index e2154d0..a03c74e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.system.service.impl;
 
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
@@ -152,8 +152,10 @@
   public void updateMember(Member member) {
     Member oldMember = this.getById(member.getId());
     ApiAlertException.throwIfNull(oldMember, "The member [id=%s] not found", member.getId());
-    Utils.required(oldMember.getTeamId().equals(member.getTeamId()), "Team id cannot be changed.");
-    Utils.required(oldMember.getUserId().equals(member.getUserId()), "User id cannot be changed.");
+    AssertUtils.state(
+        oldMember.getTeamId().equals(member.getTeamId()), "Team id cannot be changed.");
+    AssertUtils.state(
+        oldMember.getUserId().equals(member.getUserId()), "User id cannot be changed.");
     ApiAlertException.throwIfNull(
         roleService.getById(member.getRoleId()), "The roleId [%s] not found", member.getRoleId());
     oldMember.setRoleId(member.getRoleId());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
index 1ae53fe..8ce2516 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
@@ -17,8 +17,8 @@
 
 package org.apache.streampark.console.system.service.impl;
 
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.DateUtils;
-import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.ResponseCode;
 import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.domain.RestResponse;
@@ -93,7 +93,7 @@
   public IPage<User> getPage(User user, RestRequest request) {
     Page<User> page = MybatisPager.getPage(request);
     IPage<User> resPage = this.baseMapper.selectPage(page, user);
-    Utils.requireNotNull(resPage);
+    AssertUtils.notNull(resPage);
     if (resPage.getTotal() == 0) {
       resPage.setRecords(Collections.emptyList());
     }
@@ -197,7 +197,7 @@
   @Override
   public void setLastTeam(Long teamId, Long userId) {
     User user = getById(userId);
-    Utils.requireNotNull(user);
+    AssertUtils.notNull(user);
     user.setLastTeamId(teamId);
     this.baseMapper.updateById(user);
   }
@@ -205,7 +205,7 @@
   @Override
   public void clearLastTeam(Long userId, Long teamId) {
     User user = getById(userId);
-    Utils.requireNotNull(user);
+    AssertUtils.notNull(user);
     if (!teamId.equals(user.getLastTeamId())) {
       return;
     }
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
index 398fce3..c7fd108 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
@@ -19,10 +19,10 @@
 
 import org.apache.streampark.common.conf.CommonConfig;
 import org.apache.streampark.common.conf.ConfigKeys;
+import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.SystemPropertyUtils;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.flink.util.Preconditions;
 
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.BeforeAll;
@@ -104,13 +104,12 @@
   }
 
   private static File tryFindStreamParkPackagedDirFile() {
-    String userDir = Preconditions.checkNotNull(SystemPropertyUtils.get("user.dir"));
+    String userDir = AssertUtils.notNull(SystemPropertyUtils.get("user.dir"));
     File pkgTargetDirFile = new File(userDir, "target");
-    Preconditions.checkState(
+    AssertUtils.state(
         pkgTargetDirFile.exists(),
-        "The target directory of %s doesn't exist. %s",
-        userDir,
-        RUN_PKG_SCRIPT_HINT);
+        String.format(
+            "The target directory of %s doesn't exist. %s", userDir, RUN_PKG_SCRIPT_HINT));
     Optional<File> availablePkgParentFileOpt =
         Arrays.stream(requireNonNull(pkgTargetDirFile.listFiles(PKG_NAME_FILTER))).findFirst();
     final File availablePkgParentFile =
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala
index e06a9a1..142bedc 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala
@@ -61,7 +61,7 @@
   /** set docker-host for kata */
   def setDockerHost(): Unit = {
     val dockerhost: String = InternalConfigHolder.get(CommonConfig.DOCKER_HOST)
-    if (Utils.requireNotEmpty(dockerhost)) {
+    if (Utils.isNotEmpty(dockerhost)) {
       val dockerHostUri: URI = new URI(dockerhost)
       dockerHttpClientBuilder.dockerHost(dockerHostUri)
     }
diff --git a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
index f1a8b47..326ff17 100644
--- a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
+++ b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.testcontainer.flink;
 
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,9 +98,9 @@
 
   @Override
   public void start() {
-    Utils.requireNotNull(jobManagerContainer);
+    AssertUtils.notNull(jobManagerContainer);
     jobManagerContainer.start();
-    Utils.requireNotNull(taskManagerContainers);
+    AssertUtils.notNull(taskManagerContainers);
     for (FlinkContainer taskManagerContainer : taskManagerContainers) {
       taskManagerContainer.start();
     }
@@ -108,11 +108,11 @@
 
   @Override
   public void stop() {
-    Utils.requireNotNull(taskManagerContainers);
+    AssertUtils.notNull(taskManagerContainers);
     for (FlinkContainer taskManagerContainer : taskManagerContainers) {
       taskManagerContainer.stop();
     }
-    Utils.requireNotNull(jobManagerContainer);
+    AssertUtils.notNull(jobManagerContainer);
     jobManagerContainer.stop();
   }
 
@@ -151,13 +151,13 @@
     }
 
     public Builder taskManagerNum(int taskManagerNum) {
-      Utils.required(taskManagerNum >= 0, "taskManagerNum must be greater than -1.");
+      AssertUtils.required(taskManagerNum >= 0, "taskManagerNum must be greater than -1.");
       this.taskManagerNum = taskManagerNum;
       return this;
     }
 
     public Builder slotsNumPerTm(int slotsNumPerTm) {
-      Utils.required(slotsNumPerTm > 0, "slotsNumPerTm must be greater than 0.");
+      AssertUtils.required(slotsNumPerTm > 0, "slotsNumPerTm must be greater than 0.");
       this.slotsNumPerTm = slotsNumPerTm;
       return this;
     }