[ISSUE-3087][Improve] Improve streampark-flink module based on [3.6 Control/Condition Statements] (#3645)

[ISSUE-3087][Improve] Improve streampark-flink module based on [3.6 Control/Condition Statements] (#3645)
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index e268401..de9a75d 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -235,11 +235,10 @@
       kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
       val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
 
-      if (
-        shutDownRequest.clusterId != null && kubeClientWrapper
-          .getService(shutDownRequest.clusterId)
-          .isPresent
-      ) {
+      val stopAndCleanupState = shutDownRequest.clusterId != null && kubeClientWrapper
+        .getService(shutDownRequest.clusterId)
+        .isPresent
+      if (stopAndCleanupState) {
         kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
         ShutDownResponse(shutDownRequest.clusterId)
       } else {
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index ec0a809..b2d3c84 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -144,13 +144,14 @@
       flinkConfig: Configuration): SubmitResponse = {
     var proxyUserUgi: UserGroupInformation = UserGroupInformation.getCurrentUser
     val currentUser = UserGroupInformation.getCurrentUser
-    if (!HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
-      if (StringUtils.isNotEmpty(submitRequest.hadoopUser)) {
-        proxyUserUgi = UserGroupInformation.createProxyUser(
-          submitRequest.hadoopUser,
-          currentUser
-        )
-      }
+    val eableProxyState =
+      !HadoopUtils.isKerberosSecurityEnabled(currentUser) && StringUtils.isNotEmpty(
+        submitRequest.hadoopUser)
+    if (eableProxyState) {
+      proxyUserUgi = UserGroupInformation.createProxyUser(
+        submitRequest.hadoopUser,
+        currentUser
+      )
     }
 
     proxyUserUgi.doAs[SubmitResponse](new PrivilegedAction[SubmitResponse] {
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index c91fc45..9aaa133 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -259,12 +259,11 @@
       flinkConfig.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
       val yarnClusterDescriptor = getSessionClusterDescriptor(flinkConfig)
       clusterDescriptor = yarnClusterDescriptor._2
-      if (
-        FinalApplicationStatus.UNDEFINED.equals(
-          clusterDescriptor.getYarnClient
-            .getApplicationReport(ApplicationId.fromString(shutDownRequest.clusterId))
-            .getFinalApplicationStatus)
-      ) {
+      val shutDownState = FinalApplicationStatus.UNDEFINED.equals(
+        clusterDescriptor.getYarnClient
+          .getApplicationReport(ApplicationId.fromString(shutDownRequest.clusterId))
+          .getFinalApplicationStatus)
+      if (shutDownState) {
         val clientProvider = clusterDescriptor.retrieve(yarnClusterDescriptor._1)
         client = clientProvider.getClusterClient
         client.shutDownCluster()
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index a31250d..5cf2a9d 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -136,10 +136,9 @@
       flinkConfig.setBoolean(
         SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
         submitRequest.allowNonRestoredState)
-      if (
-        submitRequest.flinkVersion.checkVersion(
-          FlinkRestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode != null
-      ) {
+      val eableRestoreModeState = submitRequest.flinkVersion.checkVersion(
+        FlinkRestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode != null
+      if (eableRestoreModeState) {
         flinkConfig.setString(FlinkRestoreMode.RESTORE_MODE, submitRequest.restoreMode.getName);
       }
     }
@@ -473,17 +472,17 @@
     }
 
     // execution.runtime-mode
-    if (submitRequest.properties.nonEmpty) {
-      if (submitRequest.properties.containsKey(ExecutionOptions.RUNTIME_MODE.key())) {
-        programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
-        programArgs += submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString
-      }
+    val addRuntimeModeState =
+      submitRequest.properties.nonEmpty && submitRequest.properties.containsKey(
+        ExecutionOptions.RUNTIME_MODE.key())
+    if (addRuntimeModeState) {
+      programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
+      programArgs += submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString
     }
 
-    if (
-      submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK
-      && submitRequest.executionMode != FlinkExecutionMode.YARN_APPLICATION
-    ) {
+    val addUserJarFileState =
+      submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK && submitRequest.executionMode != FlinkExecutionMode.YARN_APPLICATION
+    if (addUserJarFileState) {
       // python file
       programArgs.add("-py")
       programArgs.add(submitRequest.userJarFile.getAbsolutePath)
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 49958a2..bbaf5e0 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -47,20 +47,20 @@
         KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
         covertToServiceExposedType(submitRequest.k8sSubmitParam.flinkRestExposedType.get))
 
-    if (submitRequest.buildResult != null) {
-      if (submitRequest.executionMode == FlinkExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
-        val buildResult = submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse]
-        buildResult.podTemplatePaths.foreach(
-          p => {
-            if (PodTemplateTool.KUBERNETES_POD_TEMPLATE.key.equals(p._1)) {
-              flinkConfig.safeSet(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, p._2)
-            } else if (PodTemplateTool.KUBERNETES_JM_POD_TEMPLATE.key.equals(p._1)) {
-              flinkConfig.safeSet(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, p._2)
-            } else if (PodTemplateTool.KUBERNETES_TM_POD_TEMPLATE.key.equals(p._1)) {
-              flinkConfig.safeSet(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, p._2)
-            }
-          })
-      }
+    val addBuildParamState =
+      submitRequest.buildResult != null && submitRequest.executionMode == FlinkExecutionMode.KUBERNETES_NATIVE_APPLICATION
+    if (addBuildParamState) {
+      val buildResult = submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse]
+      buildResult.podTemplatePaths.foreach(
+        p => {
+          if (PodTemplateTool.KUBERNETES_POD_TEMPLATE.key.equals(p._1)) {
+            flinkConfig.safeSet(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, p._2)
+          } else if (PodTemplateTool.KUBERNETES_JM_POD_TEMPLATE.key.equals(p._1)) {
+            flinkConfig.safeSet(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, p._2)
+          } else if (PodTemplateTool.KUBERNETES_TM_POD_TEMPLATE.key.equals(p._1)) {
+            flinkConfig.safeSet(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, p._2)
+          }
+        })
     }
 
     // add flink conf configuration, mainly to set the log4j configuration
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateParser.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateParser.scala
index e66e7fe..b515824 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateParser.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateParser.scala
@@ -70,10 +70,9 @@
           }))
     }
 
-    if (
-      root.containsKey("spec")
-      && Try(!root.get("spec").asInstanceOf[JMap[String, Any]].isEmpty).getOrElse(false)
-    ) {
+    val enableSpecState = root.containsKey("spec") && Try(
+      !root.get("spec").asInstanceOf[JMap[String, Any]].isEmpty).getOrElse(false)
+    if (enableSpecState) {
       res.put("spec", root.get("spec"))
     }
     yaml.dumpAsMap(res)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 927ffc0..aa7d57b 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -115,9 +115,9 @@
             case Some(jobState) =>
               val trackId = id.copy(jobId = jobState.jobId)
               val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
-              if (
+              val updateState =
                 latest == null || latest.jobState != jobState.jobState || latest.jobId != jobState.jobId
-              ) {
+              if (updateState) {
                 // put job status to cache
                 watchController.jobStatuses.put(trackId, jobState)
                 // set jobId to trackIds
@@ -327,9 +327,9 @@
       pollEmitTime = pollEmitTime,
       pollAckTime = System.currentTimeMillis)
 
-    if (
+    val updateJobState =
       jobState == FlinkJobStateEnum.SILENT && latest != null && latest.jobState == FlinkJobStateEnum.SILENT
-    ) {
+    if (updateJobState) {
       Some(jobStatusCV.copy(pollEmitTime = latest.pollEmitTime, pollAckTime = latest.pollAckTime))
     } else {
       Some(jobStatusCV)
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index c424b3b..186d444 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -65,11 +65,11 @@
         "central",
         Constant.DEFAULT,
         InternalConfigHolder.get(MAVEN_REMOTE_URL))
-    val remoteRepository =
-      if (
+    val remoteRepository = {
+      val buildState =
         InternalConfigHolder.get(MAVEN_AUTH_USER) == null || InternalConfigHolder.get(
           MAVEN_AUTH_PASSWORD) == null
-      ) {
+      if (buildState) {
         builder.build()
       } else {
         val authentication = new AuthenticationBuilder()
@@ -78,6 +78,7 @@
           .build()
         builder.setAuthentication(authentication).build()
       }
+    }
     List(remoteRepository)
   }
 
@@ -253,11 +254,11 @@
     override def canFilter(jar: File): Boolean = true
 
     override def isFiltered(name: String): Boolean = {
-      if (name.startsWith("META-INF/")) {
-        if (name.endsWith(".SF") || name.endsWith(".DSA") || name.endsWith(".RSA")) {
-          logInfo(s"shade ignore file: $name")
-          return true
-        }
+      val isFilteredState = name.startsWith("META-INF/") && name.endsWith(".SF") || name.endsWith(
+        ".DSA") || name.endsWith(".RSA")
+      if (isFilteredState) {
+        logInfo(s"shade ignore file: $name")
+        return true
       }
       false
     }
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
index eb9b402..9c8e944 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
@@ -37,10 +37,10 @@
     var lastTime: Long) {
   // noinspection DuplicatedCode
   def update(pullRsp: PullResponseItem): Unit = {
-    if (
+    val nonPullUpdateState =
       pullRsp == null || StringUtils.isBlank(pullRsp.getId) || StringUtils.isBlank(
         pullRsp.getStatus)
-    ) {
+    if (nonPullUpdateState) {
       return
     }
     if (pullRsp.getStatus.contains("complete")) {
@@ -79,10 +79,10 @@
     var lastTime: Long) {
   // noinspection DuplicatedCode
   def update(pushRsp: PushResponseItem): Unit = {
-    if (
+    val nonPushUpdateState =
       pushRsp == null || StringUtils.isBlank(pushRsp.getId) || StringUtils.isBlank(
         pushRsp.getStatus)
-    ) {
+    if (nonPushUpdateState) {
       return
     }
     if (pushRsp.getStatus.contains("complete")) {
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index ba8af63..25849fa 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -138,10 +138,9 @@
           val pullImageCmd = {
             // when the register address prefix is explicitly identified on base image tag,
             // the user's pre-saved docker register auth info would be used.
-            if (
-              dockerConf.registerAddress != null && !baseImageTag.startsWith(
-                dockerConf.registerAddress)
-            ) {
+            val pullImageCmdState = dockerConf.registerAddress != null && !baseImageTag.startsWith(
+              dockerConf.registerAddress)
+            if (pullImageCmdState) {
               dockerClient.pullImageCmd(baseImageTag)
             } else {
               dockerClient.pullImageCmd(baseImageTag).withAuthConfig(dockerConf.toAuthConf)
@@ -229,7 +228,9 @@
       registerAddress: String,
       imageNamespace: String): String = {
     var tagName = if (tag.contains("/")) tag else s"$imageNamespace/$tag"
-    if (StringUtils.isNotBlank(registerAddress) && !tagName.startsWith(registerAddress)) {
+    val addRegisterAddressState =
+      StringUtils.isNotBlank(registerAddress) && !tagName.startsWith(registerAddress)
+    if (addRegisterAddressState) {
       tagName = s"$registerAddress/$tagName"
     }
     tagName.toLowerCase
diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
index 7db1fbe..b6e9d7b 100644
--- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
+++ b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
@@ -101,9 +101,9 @@
     if (urlClassLoaderResource != null && JAR_PROTOCOL == urlClassLoaderResource.getProtocol) {
       val spec = urlClassLoaderResource.getFile
       val filename = new File(spec.substring(0, spec.indexOf("!/"))).getName
-      if (
+      val matchState =
         FLINK_PATTERN.matcher(filename).matches && !flinkResourcePattern.matcher(filename).matches
-      ) {
+      if (matchState) {
         return null
       }
     }
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index 12a2fdc..19ced0e 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -641,15 +641,13 @@
   private[this] def isSingleLineComment(curChar: Char, nextChar: Char): Boolean = {
     var flag = false
     for (singleCommentPrefix <- singleLineCommentPrefixList) {
-      if (singleCommentPrefix.length == 1) {
-        if (curChar == singleCommentPrefix.charAt(0)) {
+      singleCommentPrefix.length match {
+        case 1 if curChar == singleCommentPrefix.charAt(0) => flag = true
+        case 2
+            if curChar == singleCommentPrefix.charAt(0) && nextChar == singleCommentPrefix.charAt(
+              1) =>
           flag = true
-        }
-      }
-      if (singleCommentPrefix.length == 2) {
-        if (curChar == singleCommentPrefix.charAt(0) && nextChar == singleCommentPrefix.charAt(1)) {
-          flag = true
-        }
+        case _ =>
       }
     }
     flag