[Improve] deploy flink job on k8s timeout improvement (#3425)

* [Improve] k8s timeout improvement

* [Improve] maven build args check improvement

---------

Co-authored-by: benjobs <benjobx@gmail.com>
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
index 882d94f..3ccbdac 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
@@ -46,6 +46,8 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -210,39 +212,49 @@
     StringBuilder cmdBuffer = new StringBuilder(mvn).append(" clean package -DskipTests ");
 
     if (StringUtils.isNotBlank(this.buildArgs)) {
-      List<String> dangerArgs = getDangerArgs(this.buildArgs);
-      if (dangerArgs.isEmpty()) {
+      String dangerArgs = getDangerArgs(this.buildArgs);
+      if (dangerArgs == null) {
         cmdBuffer.append(this.buildArgs.trim());
       } else {
         throw new IllegalArgumentException(
             String.format(
-                "Invalid build args, dangerous operation symbol detected: %s, in your buildArgs: %s",
-                dangerArgs.stream().collect(Collectors.joining(",")), this.buildArgs));
+                "Invalid maven argument, dangerous args: %s, in your buildArgs: %s",
+                dangerArgs, this.buildArgs));
       }
     }
 
     String setting = InternalConfigHolder.get(CommonConfig.MAVEN_SETTINGS_PATH());
     if (StringUtils.isNotBlank(setting)) {
-      List<String> dangerArgs = getDangerArgs(setting);
-      if (dangerArgs.isEmpty()) {
+      String dangerArgs = getDangerArgs(setting);
+      if (dangerArgs == null) {
         File file = new File(setting);
         if (file.exists() && file.isFile()) {
           cmdBuffer.append(" --settings ").append(setting);
         } else {
           throw new IllegalArgumentException(
-              String.format("Invalid maven setting path, %s no exists or not file", setting));
+              String.format("Invalid maven-setting file path, %s no exists or not file", setting));
         }
       } else {
         throw new IllegalArgumentException(
             String.format(
-                "Invalid maven setting path, dangerous operation symbol detected: %s, in your maven setting path: %s",
-                dangerArgs.stream().collect(Collectors.joining(",")), setting));
+                "Invalid maven-setting file path, dangerous args: %s, in your maven setting path: %s",
+                dangerArgs, setting));
       }
     }
     return cmdBuffer.toString();
   }
 
-  private List<String> getDangerArgs(String param) {
+  private String getDangerArgs(String param) {
+    Pattern pattern = Pattern.compile("(`.*?`)|(\\$\\((.*?)\\))");
+    Matcher matcher = pattern.matcher(param);
+    if (matcher.find()) {
+      String dangerArgs = matcher.group(1);
+      if (dangerArgs == null) {
+        dangerArgs = matcher.group(2);
+      }
+      return dangerArgs;
+    }
+
     String[] args = param.split("\\s+");
     List<String> dangerArgs = new ArrayList<>();
     for (String arg : args) {
@@ -263,7 +275,10 @@
         }
       }
     }
-    return dangerArgs;
+    if (!dangerArgs.isEmpty()) {
+      return dangerArgs.stream().collect(Collectors.joining(","));
+    }
+    return null;
   }
 
   @JsonIgnore
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
index e7d6736..63d5e8c 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
@@ -59,8 +59,8 @@
     // upload flink-job jar
     val uploadResult = Request
       .post(s"$jmRestUrl/jars/upload")
-      .connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-      .responseTimeout(Timeout.ofSeconds(60))
+      .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+      .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
       .body(
         MultipartEntityBuilder
           .create()
@@ -90,8 +90,8 @@
     // refer to https://ci.apache.org/projects/flink/flink-docs-stable/docs/ops/rest_api/#jars-upload
     val resp = Request
       .post(s"$jmRestUrl/jars/${jarUploadResponse.jarId}/run")
-      .connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-      .responseTimeout(Timeout.ofSeconds(60))
+      .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+      .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
       .body(new StringEntity(Serialization.write(new JarRunRequest(flinkConfig))))
       .execute
       .returnContent()
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index d10f81b..916096f 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -31,22 +31,22 @@
 import org.apache.flink.configuration.{Configuration, DeploymentOptions, RestOptions}
 import org.apache.flink.kubernetes.KubernetesClusterDescriptor
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
+import org.apache.hc.core5.util.Timeout
 
 import javax.annotation.Nullable
 
-import java.time.Duration
-
 import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
 
 object KubernetesRetriever extends Logger {
 
   // see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
-  val FLINK_CLIENT_TIMEOUT_SEC = 60L
+  val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
+    Timeout.ofMilliseconds(ClientOptions.CLIENT_TIMEOUT.defaultValue().toMillis)
+
   // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
-  val FLINK_REST_AWAIT_TIMEOUT_SEC = 30L
-  // see org.apache.flink.configuration.RestOptions.RETRY_MAX_ATTEMPTS
-  val FLINK_REST_RETRY_MAX_ATTEMPTS = 30
+  val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
+    Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
 
   /** get new KubernetesClient */
   @throws(classOf[KubernetesClientException])
@@ -70,9 +70,11 @@
     val flinkConfig = new Configuration()
     flinkConfig.setString(DeploymentOptions.TARGET, executeMode.toString)
     flinkConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId)
-    flinkConfig.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofSeconds(FLINK_CLIENT_TIMEOUT_SEC))
-    flinkConfig.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, FLINK_REST_AWAIT_TIMEOUT_SEC * 1000)
-    flinkConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, FLINK_REST_RETRY_MAX_ATTEMPTS)
+    flinkConfig.set(ClientOptions.CLIENT_TIMEOUT, ClientOptions.CLIENT_TIMEOUT.defaultValue())
+    flinkConfig.set(
+      RestOptions.AWAIT_LEADER_TIMEOUT,
+      RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
+    flinkConfig.set(RestOptions.RETRY_MAX_ATTEMPTS, RestOptions.RETRY_MAX_ATTEMPTS.defaultValue())
     if (Try(namespace.isEmpty).getOrElse(true)) {
       flinkConfig.setString(
         KubernetesConfigOptions.NAMESPACE,
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
index 5ac00db..a35da00 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
@@ -115,8 +115,8 @@
         Checkpoint.as(
           Request
             .get(s"$flinkJmRestUrl/jobs/${trackId.jobId}/checkpoints")
-            .connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-            .responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
+            .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+            .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
             .execute
             .returnContent
             .asString(StandardCharsets.UTF_8)) match {
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 7dbfcfd..927ffc0 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -266,8 +266,8 @@
     val jobDetails = JobDetails.as(
       Request
         .get(s"$restUrl/jobs/overview")
-        .connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-        .responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
+        .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+        .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
         .execute
         .returnContent()
         .asString(StandardCharsets.UTF_8))
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 8fc80a3..7a57063 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -127,8 +127,8 @@
       .as(
         Request
           .get(s"$flinkJmRestUrl/overview")
-          .connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-          .responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
+          .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+          .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
           .execute
           .returnContent
           .asString(StandardCharsets.UTF_8))
@@ -140,8 +140,8 @@
         .as(
           Request
             .get(s"$flinkJmRestUrl/jobmanager/config")
-            .connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
-            .responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
+            .connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
+            .responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
             .execute
             .returnContent
             .asString(StandardCharsets.UTF_8))