[Improve] AssertUtils add and check not null improvements (#3625)
* [Improve] AssertUtils add and check not null improvements
* [Improve] code style improvements
---------
Co-authored-by: benjobs <benjobx@gmail.com>
diff --git a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
deleted file mode 100644
index 22d93e5..0000000
--- a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.common.util;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-
-/** Utils to process exception message. */
-public class ExceptionUtils {
-
- private ExceptionUtils() {}
-
- /**
- * Stringify the exception object.
- *
- * @param throwable the target exception to stringify.
- * @return the result of string-exception.
- */
- @Nonnull
- public static String stringifyException(@Nullable Throwable throwable) {
- if (throwable == null) {
- return "(null)";
- }
- try (StringWriter stm = new StringWriter();
- PrintWriter writer = new PrintWriter(stm)) {
- throwable.printStackTrace(writer);
- return stm.toString();
- } catch (IOException e) {
- return e.getClass().getName() + " (error while printing stack trace)";
- }
- }
-
- @FunctionalInterface
- public interface WrapperRuntimeExceptionHandler<I, O> {
- O handle(I input) throws Exception;
- }
-
- public static <I, O> O wrapRuntimeException(
- I input, WrapperRuntimeExceptionHandler<I, O> handler) {
- try {
- return handler.handle(input);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala b/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala
index 8e42635..1fce430 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/fs/LfsOperator.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.common.fs
import org.apache.streampark.common.util.Logger
-import org.apache.streampark.common.util.Utils.{isAnyBank, requireNotEmpty}
+import org.apache.streampark.common.util.Utils.{isAnyBank, isNotEmpty}
import org.apache.commons.codec.digest.DigestUtils
import org.apache.commons.io.{FileUtils, IOUtils}
@@ -41,7 +41,7 @@
}
override def delete(path: String): Unit = {
- if (requireNotEmpty(path)) {
+ if (isNotEmpty(path)) {
val file = new File(path)
if (file.exists()) {
FileUtils.forceDelete(file)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala
new file mode 100644
index 0000000..dfcc9d0
--- /dev/null
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/AssertUtils.scala
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.common.util
+
+import org.apache.streampark.common.util.Utils.isEmpty
+
+import javax.annotation.Nullable
+
+import java.util
+
+import scala.collection.convert.ImplicitConversions._
+
+/** @since 2.2.0 */
+object AssertUtils {
+
+ /**
+ * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if the
+ * condition is not met (evaluates to {@code false}).
+ *
+ * @param condition
+ * The condition to check
+ * @throws IllegalArgumentException
+ * Thrown, if the condition is violated.
+ */
+ def required(condition: Boolean): Unit = {
+ if (!condition) {
+ throw new IllegalArgumentException
+ }
+ }
+
+ /**
+ * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if the
+ * condition is not met (evaluates to {@code false}). The exception will have the given error
+ * message.
+ *
+ * @param condition
+ * The condition to check
+ * @param message
+ * The message for the {@code IllegalArgumentException} that is thrown if the check fails.
+ * @throws IllegalArgumentException
+ * Thrown, if the condition is violated.
+ */
+ def required(condition: Boolean, @Nullable message: String): Unit = {
+ if (!condition) {
+ throw new IllegalArgumentException(message)
+ }
+ }
+
+ /**
+ * Checks the given boolean condition, and throws an {@code IllegalStateException} if the
+ * condition is not met (evaluates to {@code false}).
+ *
+ * @param condition
+ * The condition to check
+ * @throws IllegalStateException
+ * Thrown, if the condition is violated.
+ */
+ def state(condition: Boolean): Unit = {
+ if (!condition) {
+ throw new IllegalStateException
+ }
+ }
+
+ /**
+ * Checks the given boolean condition, and throws an IllegalStateException if the condition is not
+ * met (evaluates to {@code false}). The exception will have the given error message.
+ *
+ * @param condition
+ * The condition to check
+ * @param message
+ * The message for the IllegalStateException that is thrown if the check fails.
+ * @throws IllegalStateException
+ * Thrown, if the condition is violated.
+ */
+ def state(condition: Boolean, @Nullable message: String): Unit = {
+ if (!condition) {
+ throw new IllegalStateException(message)
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Null checks
+ // ------------------------------------------------------------------------
+ /** Ensures that the given object reference is not null. Upon violation, a */
+ def notNull[T](@Nullable reference: T): T = {
+ if (reference == null) {
+ throw new NullPointerException
+ }
+ reference
+ }
+
+ /**
+ * Ensures that the given object reference is not null. Upon violation, a NullPointerException
+ * that is thrown if the check fails.
+ *
+ * @return
+ * The object reference itself (generically typed).
+ * @throws NullPointerException
+ * Thrown, if the passed reference was null.
+ */
+ def notNull[T](@Nullable reference: T, @Nullable message: String): T = {
+ if (reference == null) {
+ throw new NullPointerException(message)
+ }
+ reference
+ }
+
+ /**
+ * Assert that an Array|CharSequence|JavaCollection|Map|Iterable... must not be {@code null} and
+ * must contain at least one element. <pre class="code">AssertUtils.notEmpty(array, "must be
+ * contain elements");</pre>
+ *
+ * @param reference
+ * the object to check
+ * @throws IllegalArgumentException
+ * if the object array is {@code null} or contains no elements
+ */
+ def notEmpty(reference: AnyRef): Unit = {
+ if (Utils.isEmpty(reference)) {
+ throw new IllegalArgumentException()
+ }
+ }
+
+ /**
+ * Assert that an Array|CharSequence|JavaCollection|Map|Iterable... must not be {@code null} and
+ * must contain at least one element. <pre class="code"> AssertUtils.notEmpty(array, "must be
+ * contain elements");</pre>
+ *
+ * @param reference
+ * the object to check
+ * @param message
+ * the exception message to use if the assertion fails
+ * @throws IllegalArgumentException
+ * if the object array is {@code null} or contains no elements
+ */
+ def notEmpty(@Nullable reference: AnyRef, message: String): Unit = {
+ if (isEmpty(reference)) {
+ throw new IllegalArgumentException(message)
+ }
+ }
+
+ /**
+ * Assert that an array contains no {@code null} elements. <p>Note: Does not complain if the array
+ * is empty! <pre class="code">AssertUtils.noNullElements(array, "The array must contain non-null
+ * elements");</pre>
+ *
+ * @param array
+ * the array to check
+ * @param message
+ * the exception message to use if the assertion fails
+ * @throws IllegalArgumentException
+ * if the object array contains a {@code null} element
+ */
+ def noNullElements(@Nullable array: Array[AnyRef], message: String): Unit = {
+ if (array != null) for (element <- array) {
+ if (element == null) throw new IllegalArgumentException(message)
+ }
+ }
+
+ /**
+ * Assert that a collection contains no {@code null} elements. <p>Note: Does not complain if the
+ * collection is empty! <pre class="code">AssertUtils.noNullElements(collection, "Collection must
+ * contain non-null elements");</pre>
+ *
+ * @param collection
+ * the collection to check
+ * @param message
+ * the exception message to use if the assertion fails
+ * @throws IllegalArgumentException
+ * if the collection contains a {@code null} element
+ */
+ def noNullElements(@Nullable collection: util.Collection[_], message: String): Unit = {
+ if (collection != null) for (element <- collection) {
+ if (element == null) {
+ throw new IllegalArgumentException(message)
+ }
+ }
+ }
+
+ /**
+ * Assert that the given String is not empty; that is, it must not be {@code null} and not the
+ * empty String. <pre class="code">AssertUtils.hasLength(name, "Name must not be empty");</pre>
+ *
+ * @param text
+ * the String to check
+ * @throws IllegalArgumentException
+ * if the text is empty
+ * @see
+ * StringUtils#hasLength
+ */
+ def hasLength(@Nullable text: String): Unit = {
+ if (!getHasLength(text)) {
+ throw new IllegalArgumentException()
+ }
+ }
+
+ /**
+ * Assert that the given String is not empty; that is, it must not be {@code null} and not the
+ * empty String. <pre class="code">AssertUtils.hasLength(name, "Name must not be empty");</pre>
+ *
+ * @param text
+ * the String to check
+ * @param message
+ * the exception message to use if the assertion fails
+ * @throws IllegalArgumentException
+ * if the text is empty
+ * @see
+ * StringUtils#hasLength
+ */
+ def hasLength(@Nullable text: String, message: String): Unit = {
+ if (!getHasLength(text)) {
+ throw new IllegalArgumentException(message)
+ }
+ }
+
+ /**
+ * Assert that the given String contains valid text content; that is, it must not be {@code null}
+ * and must contain at least one non-whitespace character. <pre
+ * class="code">AssertUtils.hasText(name, "'name' must not be empty");</pre>
+ *
+ * @param text
+ * the String to check
+ * @throws IllegalArgumentException
+ * if the text does not contain valid text content
+ * @see
+ * StringUtils#hasText
+ */
+ def hasText(@Nullable text: String): Unit = {
+ if (!getHasText(text)) {
+ throw new IllegalArgumentException()
+ }
+ }
+
+ /**
+ * Assert that the given String contains valid text content; that is, it must not be {@code null}
+ * and must contain at least one non-whitespace character. <pre
+ * class="code">AssertUtils.hasText(name, "'name' must not be empty");</pre>
+ *
+ * @param text
+ * the String to check
+ * @param message
+ * the exception message to use if the assertion fails
+ * @throws IllegalArgumentException
+ * if the text does not contain valid text content
+ * @see
+ * StringUtils#hasText
+ */
+ def hasText(@Nullable text: String, message: String): Unit = {
+ if (!getHasText(text)) {
+ throw new IllegalArgumentException(message)
+ }
+ }
+
+ private[this] def getHasLength(@Nullable str: String): Boolean = str != null && str.nonEmpty
+
+ private[this] def getHasText(@Nullable str: String): Boolean = {
+ str != null && str.nonEmpty && containsText(str)
+ }
+
+ private[this] def containsText(str: CharSequence): Boolean = {
+ val strLen = str.length
+ for (i <- 0 until strLen) {
+ if (!Character.isWhitespace(str.charAt(i))) {
+ return true
+ }
+ }
+ false
+ }
+
+}
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/ExceptionUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ExceptionUtils.scala
new file mode 100644
index 0000000..a382272
--- /dev/null
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ExceptionUtils.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.util
+
+import javax.annotation.Nonnull
+import javax.annotation.Nullable
+
+import java.io.PrintWriter
+import java.io.StringWriter
+
+object ExceptionUtils {
+
+ /**
+ * Stringify the exception object.
+ *
+ * @param throwable
+ * the target exception to stringify.
+ * @return
+ * the result of string-exception.
+ */
+ @Nonnull def stringifyException(@Nullable throwable: Throwable): String = {
+ if (throwable == null) {
+ return "(null)"
+ }
+ val stm = new StringWriter()
+ val writer = new PrintWriter(stm)
+ try {
+ throwable.printStackTrace(writer);
+ stm.toString
+ } catch {
+ case e: Exception => s"${e.getClass.getName} (error while printing stack trace)";
+ case _ => null
+ } finally {
+ Utils.close(writer, stm)
+ }
+ }
+
+ @FunctionalInterface trait WrapperRuntimeExceptionHandler[I, O] {
+ @throws[Exception]
+ def handle(input: I): O
+ }
+
+ def wrapRuntimeException[I, O](input: I, handler: WrapperRuntimeExceptionHandler[I, O]): O = {
+ try handler.handle(input)
+ catch {
+ case e: Exception =>
+ throw new RuntimeException(e)
+ }
+ }
+}
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 57cf5e5..e8fe7ef 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -92,9 +92,10 @@
def getPathFromEnv(env: String): String = {
val path = Option(System.getenv(env)).getOrElse(System.getProperty(env))
- require(
- Utils.requireNotEmpty(path),
- s"[StreamPark] FileUtils.getPathFromEnv: $env is not set on system env")
+ AssertUtils.notNull(
+ path,
+ s"[StreamPark] FileUtils.getPathFromEnv: $env is not set on system env"
+ )
val file = new File(path)
require(file.exists(), s"[StreamPark] FileUtils.getPathFromEnv: $env is not exist!")
file.getAbsolutePath
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
index faf1fc9..ac097b1 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
@@ -134,7 +134,7 @@
val shadedPackage = "org.apache.streampark.shaded"
override def configureByResource(url: URL): Unit = {
- Utils.requireNotNull(url, "URL argument cannot be null")
+ AssertUtils.notNull(url, "URL argument cannot be null")
val path = url.getPath
if (path.endsWith("xml")) {
val configurator = new JoranConfigurator()
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 541f57f..e3a0ae5 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -281,7 +281,7 @@
val map = mutable.Map[String, String]()
val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "")
simple.split("\\s?-D") match {
- case d if Utils.requireNotEmpty(d) =>
+ case d if Utils.isNotEmpty(d) =>
d.foreach(
x => {
if (x.nonEmpty) {
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index fcba212..ed27537 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -33,17 +33,7 @@
private[this] lazy val OS = System.getProperty("os.name").toLowerCase
- def requireNotNull(obj: Any, message: String): Unit = {
- if (obj == null) {
- throw new NullPointerException(message)
- }
- }
-
- def requireNotNull(obj: Any): Unit = {
- requireNotNull(obj, "this argument must not be null")
- }
-
- def requireNotEmpty(elem: Any): Boolean = {
+ def isNotEmpty(elem: Any): Boolean = {
elem match {
case null => false
case x if x.isInstanceOf[Array[_]] => elem.asInstanceOf[Array[_]].nonEmpty
@@ -56,19 +46,7 @@
}
}
- def isEmpty(elem: Any): Boolean = !requireNotEmpty(elem)
-
- def required(expression: Boolean): Unit = {
- if (!expression) {
- throw new IllegalArgumentException
- }
- }
-
- def required(expression: Boolean, errorMessage: Any): Unit = {
- if (!expression) {
- throw new IllegalArgumentException(s"Requirement failed: ${errorMessage.toString}")
- }
- }
+ def isEmpty(elem: Any): Boolean = !isNotEmpty(elem)
def uuid(): String = UUID.randomUUID().toString.replaceAll("-", "")
diff --git a/streampark-common/src/test/scala/org/apache/streampark/common/util/UtilsTest.scala b/streampark-common/src/test/scala/org/apache/streampark/common/util/UtilsTest.scala
index afc5f04..5f7a5c8 100644
--- a/streampark-common/src/test/scala/org/apache/streampark/common/util/UtilsTest.scala
+++ b/streampark-common/src/test/scala/org/apache/streampark/common/util/UtilsTest.scala
@@ -30,31 +30,31 @@
test("requiredNotNull should throw NullPointerException if argument is null") {
val nullPointerException = intercept[NullPointerException] {
- Utils.requireNotNull(null, "object can't be null")
+ AssertUtils.notNull(null, "object can't be null")
}
assert(nullPointerException.getMessage == "object can't be null")
}
test("requireNotEmpty should check if argument is not empty") {
- assert(!Utils.requireNotEmpty(null))
- assert(Utils.requireNotEmpty(Array(1)))
- assert(Utils.requireNotEmpty("string"))
- assert(Utils.requireNotEmpty(Seq("Seq")))
- assert(Utils.requireNotEmpty(Iterable("Iterable")))
+ assert(!Utils.isNotEmpty(null))
+ assert(Utils.isNotEmpty(Array(1)))
+ assert(Utils.isNotEmpty("string"))
+ assert(Utils.isNotEmpty(Seq("Seq")))
+ assert(Utils.isNotEmpty(Iterable("Iterable")))
val arrayList = new util.ArrayList[String](16)
arrayList.add("arrayList")
- assert(Utils.requireNotEmpty(arrayList))
+ assert(Utils.isNotEmpty(arrayList))
val hashMap = new util.HashMap[String, String](16)
hashMap.put("hash", "map")
- assert(Utils.requireNotEmpty(hashMap))
- assert(Utils.requireNotEmpty())
+ assert(Utils.isNotEmpty(hashMap))
+ assert(Utils.isNotEmpty())
}
test("required should throw IllegalArgumentException if condition is false") {
val illegalArgumentException = intercept[IllegalArgumentException] {
- Utils.required(false)
+ AssertUtils.required(false)
}
assert(illegalArgumentException.getMessage == null)
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java
index d9eb147..e93737d 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/Dependency.java
@@ -43,7 +43,7 @@
@SneakyThrows
public static Dependency toDependency(String dependency) {
- if (Utils.requireNotEmpty(dependency)) {
+ if (Utils.isNotEmpty(dependency)) {
return JacksonUtils.read(dependency, new TypeReference<Dependency>() {});
}
return new Dependency();
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
index f3e713e..ab45f2d 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.component;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.SavePoint;
@@ -28,8 +29,6 @@
import org.apache.streampark.console.core.service.application.ApplicationActionService;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
-import org.apache.flink.shaded.guava30.com.google.common.base.Preconditions;
-
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.Data;
@@ -120,10 +119,9 @@
checkPointFailedCache.remove(appId);
FailoverStrategyEnum failoverStrategyEnum =
FailoverStrategyEnum.of(application.getCpFailureAction());
- Preconditions.checkArgument(
+ AssertUtils.required(
failoverStrategyEnum != null,
- "Unexpected cpFailureAction: %s",
- application.getCpFailureAction());
+ "Unexpected cpFailureAction: " + application.getCpFailureAction());
processFailoverStrategy(application, failoverStrategyEnum);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
index 5af4fc0..22076c5 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ExternalLinkController.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.controller;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.entity.ExternalLink;
import org.apache.streampark.console.core.service.ExternalLinkService;
@@ -82,7 +82,7 @@
@PostMapping("/update")
@RequiresPermissions("externalLink:update")
public RestResponse update(@Valid ExternalLink externalLink) {
- Utils.requireNotNull(externalLink.getId(), "The link id cannot be null");
+ AssertUtils.notNull(externalLink.getId(), "The link id cannot be null");
externalLinkService.update(externalLink);
return RestResponse.success();
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
index 1005cb3..9f5bd6c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
@@ -20,6 +20,7 @@
import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.InternalConfigHolder;
import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.streampark.console.base.util.GitUtils;
@@ -28,7 +29,6 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.shaded.guava30.com.google.common.base.Preconditions;
import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.IdType;
@@ -40,8 +40,6 @@
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jgit.lib.Constants;
-import javax.annotation.Nonnull;
-
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
@@ -188,6 +186,33 @@
@JsonIgnore
public String getMavenArgs() {
+ // 1) check build args
+ String buildArg = getMvnBuildArgs();
+ StringBuilder argBuilder = new StringBuilder();
+ if (StringUtils.isNotBlank(buildArg)) {
+ argBuilder.append(buildArg);
+ }
+
+ // 2) mvn setting file
+ String mvnSetting = getMvnSetting();
+ if (StringUtils.isNotBlank(mvnSetting)) {
+ argBuilder.append(" --settings ").append(mvnSetting);
+ }
+
+ // 3) check args
+ String cmd = argBuilder.toString();
+ String illegalArg = getIllegalArgs(cmd);
+ if (illegalArg != null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid maven argument, illegal args: %s, in your maven args: %s", illegalArg, cmd));
+ }
+
+ String mvn = getMvn();
+ return mvn.concat(" ").concat(cmd);
+ }
+
+ private String getMvn() {
boolean windows = Utils.isWindows();
String mvn = windows ? "mvn.cmd" : "mvn";
@@ -200,7 +225,7 @@
try {
Process process = Runtime.getRuntime().exec(mvn + " --version");
process.waitFor();
- Utils.required(process.exitValue() == 0);
+ AssertUtils.required(process.exitValue() == 0);
useWrapper = false;
} catch (Exception ignored) {
log.warn("try using user-installed maven failed, now use maven-wrapper.");
@@ -210,44 +235,32 @@
if (useWrapper) {
mvn = WebUtils.getAppHome().concat(windows ? "/bin/mvnw.cmd" : "/bin/mvnw");
}
-
- return renderCmd(mvn);
+ return mvn;
}
- @Nonnull
- private String renderCmd(String mvn) {
- StringBuilder cmdBuffer = new StringBuilder(mvn).append(" clean package -DskipTests ");
- renderCmdByBuildArgs(cmdBuffer);
- renderCmdBySetting(cmdBuffer);
- return cmdBuffer.toString();
- }
-
- private void renderCmdByBuildArgs(StringBuilder cmdBuffer) {
+ private String getMvnBuildArgs() {
if (StringUtils.isNotBlank(this.buildArgs)) {
String args = getIllegalArgs(this.buildArgs);
- Preconditions.checkArgument(
+ AssertUtils.required(
args == null,
- "Illegal argument: \"%s\" in maven build parameters: %s",
- args,
- this.buildArgs);
- cmdBuffer.append(this.buildArgs.trim());
+ String.format(
+ "Illegal argument: \"%s\" in maven build parameters: %s", args, this.buildArgs));
+ return this.buildArgs.trim();
}
+ return null;
}
- private void renderCmdBySetting(StringBuilder cmdBuffer) {
+ private String getMvnSetting() {
String setting = InternalConfigHolder.get(CommonConfig.MAVEN_SETTINGS_PATH());
if (StringUtils.isBlank(setting)) {
- return;
+ return null;
}
- String args = getIllegalArgs(setting);
- Preconditions.checkArgument(
- args == null, "Illegal argument \"%s\" in maven-setting file path: %s", args, setting);
File file = new File(setting);
- Preconditions.checkArgument(
+ AssertUtils.required(
!file.exists() || !file.isFile(),
- "Invalid maven-setting file path \"%s\", the path not exist or is not file",
- setting);
- cmdBuffer.append(" --settings ").append(setting);
+ String.format(
+ "Invalid maven-setting file path \"%s\", the path not exist or is not file", setting));
+ return setting;
}
private String getIllegalArgs(String param) {
@@ -257,7 +270,7 @@
return matcher.group(1) == null ? matcher.group(2) : matcher.group(1);
}
- Iterator<String> iterator = Arrays.asList(";", "|", "&", ">").iterator();
+ Iterator<String> iterator = Arrays.asList(";", "|", "&", ">", "<").iterator();
String[] argsList = param.split("\\s+");
while (iterator.hasNext()) {
String chr = iterator.next();
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 635c2a4..3454dd8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -24,8 +24,8 @@
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.SystemPropertyUtils;
-import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.zio.ZIOExt;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.entity.FlinkEnv;
@@ -101,7 +101,7 @@
.forEach(
key -> {
InternalOption config = InternalConfigHolder.getConfig(key);
- Utils.requireNotNull(config);
+ AssertUtils.notNull(config);
InternalConfigHolder.set(config, env.getProperty(key, config.classType()));
});
@@ -181,7 +181,7 @@
private void uploadClientJar(Workspace workspace, FsOperator fsOperator) {
File client = WebUtils.getAppClientDir();
- Utils.required(
+ AssertUtils.required(
client.exists() && client.listFiles().length > 0,
client.getAbsolutePath().concat(" is not exists or empty directory "));
@@ -198,7 +198,7 @@
File[] shims =
WebUtils.getAppLibDir()
.listFiles(pathname -> pathname.getName().matches(PATTERN_FLINK_SHIMS_JAR.pattern()));
- Utils.required(shims != null && shims.length > 0, "streampark-flink-shims jar not exist");
+ AssertUtils.required(shims != null && shims.length > 0, "streampark-flink-shims jar not exist");
String appShims = workspace.APP_SHIMS();
fsOperator.delete(appShims);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 2974faa..60047f4 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -27,11 +27,11 @@
import org.apache.streampark.common.enums.FlinkRestoreMode;
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.PropertiesUtils;
-import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApplicationException;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
@@ -381,7 +381,7 @@
public void start(Application appParam, boolean auto) throws Exception {
// 1) check application
final Application application = getById(appParam.getId());
- Utils.requireNotNull(application);
+ AssertUtils.notNull(application);
ApiAlertException.throwIfTrue(
!application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly.");
@@ -397,7 +397,7 @@
}
AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());
- Utils.requireNotNull(buildPipeline);
+ AssertUtils.notNull(buildPipeline);
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
ApiAlertException.throwIfNull(flinkEnv, "[StreamPark] can no found flink version");
@@ -635,7 +635,7 @@
switch (application.getDevelopmentMode()) {
case FLINK_SQL:
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
- Utils.requireNotNull(flinkSql);
+ AssertUtils.notNull(flinkSql);
// 1) dist_userJar
String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
// 2) appConfig
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index ffcafc2..3370e66 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -24,9 +24,9 @@
import org.apache.streampark.common.enums.FlinkDevelopmentMode;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.FileUtils;
-import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.base.util.WebUtils;
@@ -194,7 +194,7 @@
FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false);
if (app.isFlinkSqlJobOrPyFlinkJob()) {
FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql;
- Utils.requireNotNull(flinkSql);
+ AssertUtils.notNull(flinkSql);
app.setDependency(flinkSql.getDependency());
app.setTeamResource(flinkSql.getTeamResource());
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
index 1a88494..ebf979c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ExternalLinkServiceImpl.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.service.impl;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ExternalLink;
import org.apache.streampark.console.core.enums.PlaceholderTypeEnum;
@@ -77,7 +77,7 @@
@Override
public List<ExternalLink> render(Long appId) {
Application app = applicationManageService.getById(appId);
- Utils.requireNotNull(app, "Application doesn't exist");
+ AssertUtils.notNull(app, "Application doesn't exist");
List<ExternalLink> externalLink = this.list();
if (externalLink != null && externalLink.size() > 0) {
// Render the placeholder
@@ -112,10 +112,10 @@
if (result == null) {
return true;
}
- Utils.required(
+ AssertUtils.required(
!result.getBadgeName().equals(params.getBadgeName()),
String.format("The name: %s is already existing.", result.getBadgeName()));
- Utils.required(
+ AssertUtils.required(
!result.getLinkUrl().equals(params.getLinkUrl()),
String.format("The linkUrl: %s is already existing.", result.getLinkUrl()));
return false;
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
index d648a22..3599ed8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSqlServiceImpl.java
@@ -17,9 +17,9 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.ExceptionUtils;
-import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.core.entity.Application;
@@ -172,11 +172,11 @@
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
public void rollback(Application application) {
FlinkSql sql = getCandidate(application.getId(), CandidateTypeEnum.HISTORY);
- Utils.requireNotNull(sql);
+ AssertUtils.notNull(sql);
try {
// check and backup current job
FlinkSql effectiveSql = getEffective(application.getId(), false);
- Utils.requireNotNull(effectiveSql);
+ AssertUtils.notNull(effectiveSql);
// rollback history sql
backUpService.rollbackFlinkSql(application, sql);
} catch (Exception e) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index cdb1859..778b1a0 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -21,9 +21,9 @@
import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.InternalConfigHolder;
import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.FileUtils;
-import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.ResponseCode;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
@@ -109,7 +109,7 @@
@Override
public boolean update(Project projectParam) {
Project project = getById(projectParam.getId());
- Utils.requireNotNull(project);
+ AssertUtils.notNull(project);
ApiAlertException.throwIfFalse(
project.getTeamId().equals(projectParam.getTeamId()),
"TeamId can't be changed, update project failed.");
@@ -155,7 +155,7 @@
@Override
public boolean removeById(Long id) {
Project project = getById(id);
- Utils.requireNotNull(project);
+ AssertUtils.notNull(project);
LambdaQueryWrapper<Application> queryWrapper =
new LambdaQueryWrapper<Application>().eq(Application::getProjectId, id);
long count = applicationManageService.count(queryWrapper);
@@ -227,7 +227,7 @@
@Override
public List<String> listModules(Long id) {
Project project = getById(id);
- Utils.requireNotNull(project);
+ AssertUtils.notNull(project);
if (BuildStateEnum.SUCCESSFUL != BuildStateEnum.of(project.getBuildState())
|| !project.getDistHome().exists()) {
@@ -293,7 +293,7 @@
}
List<Map<String, Object>> confList = new ArrayList<>();
File[] files = unzipFile.listFiles(x -> "conf".equals(x.getName()));
- Utils.requireNotNull(files);
+ AssertUtils.notNull(files);
for (File item : files) {
eachFile(item, confList, true);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 8616e5a..c973be1 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -18,9 +18,9 @@
package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.enums.FlinkExecutionMode;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.ExceptionUtils;
-import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.InternalException;
@@ -288,7 +288,7 @@
Map<String, Object> properties = new HashMap<>();
if (FlinkExecutionMode.isRemoteMode(application.getFlinkExecutionMode())) {
- Utils.requireNotNull(
+ AssertUtils.notNull(
cluster,
String.format(
"The clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
@@ -306,7 +306,7 @@
}
if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
if (FlinkExecutionMode.YARN_SESSION == application.getFlinkExecutionMode()) {
- Utils.requireNotNull(
+ AssertUtils.notNull(
cluster,
String.format(
"The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
@@ -374,7 +374,7 @@
// At the remote mode, request the flink webui interface to get the savepoint path
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
- Utils.requireNotNull(
+ AssertUtils.notNull(
cluster,
String.format(
"The clusterId=%s cannot be find, maybe the clusterId is wrong or "
@@ -439,8 +439,8 @@
private void expire(SavePoint entity) {
FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
Application application = applicationManageService.getById(entity.getAppId());
- Utils.requireNotNull(flinkEnv);
- Utils.requireNotNull(application);
+ AssertUtils.notNull(flinkEnv);
+ AssertUtils.notNull(application);
int cpThreshold =
tryGetChkNumRetainedFromDynamicProps(application.getDynamicProperties())
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
index 0b70f9d..d2ba899 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlCompleteServiceImpl.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.service.impl;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.console.core.service.SqlCompleteService;
import com.google.common.collect.Sets;
@@ -56,7 +56,7 @@
@Override
public List<String> getComplete(String sql) {
- if (sql.length() > 0 && BLACK_SET.contains(sql.charAt(sql.length() - 1))) {
+ if (!sql.isEmpty() && BLACK_SET.contains(sql.charAt(sql.length() - 1))) {
return new ArrayList<>();
}
String[] temp = sql.split("\\s");
@@ -184,7 +184,7 @@
nowStep = nowStep.get(nowChar).getNext();
loc += 1;
}
- Utils.requireNotNull(preNode);
+ AssertUtils.notNull(preNode);
preNode.setStop();
preNode.setCount(count);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
index 9860654..d52bdc9 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/YarnQueueServiceImpl.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.enums.FlinkExecutionMode;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
@@ -73,8 +73,8 @@
@Override
public IPage<YarnQueue> getPage(YarnQueue yarnQueue, RestRequest request) {
- Utils.requireNotNull(yarnQueue, "Yarn queue query params mustn't be null.");
- Utils.requireNotNull(
+ AssertUtils.notNull(yarnQueue, "Yarn queue query params mustn't be null.");
+ AssertUtils.notNull(
yarnQueue.getTeamId(), "Team id of yarn queue query params mustn't be null.");
Page<YarnQueue> page = MybatisPager.getPage(request);
return this.baseMapper.selectPage(page, yarnQueue);
@@ -88,8 +88,8 @@
@Override
public ResponseResult<String> checkYarnQueue(YarnQueue yarnQueue) {
- Utils.requireNotNull(yarnQueue, "Yarn queue mustn't be empty.");
- Utils.requireNotNull(yarnQueue.getTeamId(), "Team id mustn't be null.");
+ AssertUtils.notNull(yarnQueue, "Yarn queue mustn't be empty.");
+ AssertUtils.notNull(yarnQueue.getTeamId(), "Team id mustn't be null.");
ResponseResult<String> responseResult = new ResponseResult<>();
@@ -206,8 +206,8 @@
@VisibleForTesting
public YarnQueue getYarnQueueByIdWithPreconditions(YarnQueue yarnQueue) {
- Utils.requireNotNull(yarnQueue, "Yarn queue mustn't be null.");
- Utils.requireNotNull(yarnQueue.getId(), "Yarn queue id mustn't be null.");
+ AssertUtils.notNull(yarnQueue, "Yarn queue mustn't be null.");
+ AssertUtils.notNull(yarnQueue.getId(), "Yarn queue id mustn't be null.");
YarnQueue queueFromDB = getById(yarnQueue.getId());
ApiAlertException.throwIfNull(queueFromDB, "The queue doesn't exist.");
return queueFromDB;
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
index e2154d0..a03c74e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/MemberServiceImpl.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.system.service.impl;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
@@ -152,8 +152,10 @@
public void updateMember(Member member) {
Member oldMember = this.getById(member.getId());
ApiAlertException.throwIfNull(oldMember, "The member [id=%s] not found", member.getId());
- Utils.required(oldMember.getTeamId().equals(member.getTeamId()), "Team id cannot be changed.");
- Utils.required(oldMember.getUserId().equals(member.getUserId()), "User id cannot be changed.");
+ AssertUtils.state(
+ oldMember.getTeamId().equals(member.getTeamId()), "Team id cannot be changed.");
+ AssertUtils.state(
+ oldMember.getUserId().equals(member.getUserId()), "User id cannot be changed.");
ApiAlertException.throwIfNull(
roleService.getById(member.getRoleId()), "The roleId [%s] not found", member.getRoleId());
oldMember.setRoleId(member.getRoleId());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
index 1ae53fe..8ce2516 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/service/impl/UserServiceImpl.java
@@ -17,8 +17,8 @@
package org.apache.streampark.console.system.service.impl;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.DateUtils;
-import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.ResponseCode;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
@@ -93,7 +93,7 @@
public IPage<User> getPage(User user, RestRequest request) {
Page<User> page = MybatisPager.getPage(request);
IPage<User> resPage = this.baseMapper.selectPage(page, user);
- Utils.requireNotNull(resPage);
+ AssertUtils.notNull(resPage);
if (resPage.getTotal() == 0) {
resPage.setRecords(Collections.emptyList());
}
@@ -197,7 +197,7 @@
@Override
public void setLastTeam(Long teamId, Long userId) {
User user = getById(userId);
- Utils.requireNotNull(user);
+ AssertUtils.notNull(user);
user.setLastTeamId(teamId);
this.baseMapper.updateById(user);
}
@@ -205,7 +205,7 @@
@Override
public void clearLastTeam(Long userId, Long teamId) {
User user = getById(userId);
- Utils.requireNotNull(user);
+ AssertUtils.notNull(user);
if (!teamId.equals(user.getLastTeamId())) {
return;
}
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
index 398fce3..c7fd108 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
@@ -19,10 +19,10 @@
import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.ConfigKeys;
+import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.commons.io.FileUtils;
-import org.apache.flink.util.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
@@ -104,13 +104,12 @@
}
private static File tryFindStreamParkPackagedDirFile() {
- String userDir = Preconditions.checkNotNull(SystemPropertyUtils.get("user.dir"));
+ String userDir = AssertUtils.notNull(SystemPropertyUtils.get("user.dir"));
File pkgTargetDirFile = new File(userDir, "target");
- Preconditions.checkState(
+ AssertUtils.state(
pkgTargetDirFile.exists(),
- "The target directory of %s doesn't exist. %s",
- userDir,
- RUN_PKG_SCRIPT_HINT);
+ String.format(
+ "The target directory of %s doesn't exist. %s", userDir, RUN_PKG_SCRIPT_HINT));
Optional<File> availablePkgParentFileOpt =
Arrays.stream(requireNonNull(pkgTargetDirFile.listFiles(PKG_NAME_FILTER))).findFirst();
final File availablePkgParentFile =
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala
index e06a9a1..142bedc 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/DockerRetriever.scala
@@ -61,7 +61,7 @@
/** set docker-host for kata */
def setDockerHost(): Unit = {
val dockerhost: String = InternalConfigHolder.get(CommonConfig.DOCKER_HOST)
- if (Utils.requireNotEmpty(dockerhost)) {
+ if (Utils.isNotEmpty(dockerhost)) {
val dockerHostUri: URI = new URI(dockerhost)
dockerHttpClientBuilder.dockerHost(dockerHostUri)
}
diff --git a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
index f1a8b47..326ff17 100644
--- a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
+++ b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
@@ -17,7 +17,7 @@
package org.apache.streampark.testcontainer.flink;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.AssertUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,9 +98,9 @@
@Override
public void start() {
- Utils.requireNotNull(jobManagerContainer);
+ AssertUtils.notNull(jobManagerContainer);
jobManagerContainer.start();
- Utils.requireNotNull(taskManagerContainers);
+ AssertUtils.notNull(taskManagerContainers);
for (FlinkContainer taskManagerContainer : taskManagerContainers) {
taskManagerContainer.start();
}
@@ -108,11 +108,11 @@
@Override
public void stop() {
- Utils.requireNotNull(taskManagerContainers);
+ AssertUtils.notNull(taskManagerContainers);
for (FlinkContainer taskManagerContainer : taskManagerContainers) {
taskManagerContainer.stop();
}
- Utils.requireNotNull(jobManagerContainer);
+ AssertUtils.notNull(jobManagerContainer);
jobManagerContainer.stop();
}
@@ -151,13 +151,13 @@
}
public Builder taskManagerNum(int taskManagerNum) {
- Utils.required(taskManagerNum >= 0, "taskManagerNum must be greater than -1.");
+ AssertUtils.required(taskManagerNum >= 0, "taskManagerNum must be greater than -1.");
this.taskManagerNum = taskManagerNum;
return this;
}
public Builder slotsNumPerTm(int slotsNumPerTm) {
- Utils.required(slotsNumPerTm > 0, "slotsNumPerTm must be greater than 0.");
+ AssertUtils.required(slotsNumPerTm > 0, "slotsNumPerTm must be greater than 0.");
this.slotsNumPerTm = slotsNumPerTm;
return this;
}