[ISSUE-3087][Improve] Improve streampark-flink module based on [3.6 Control/Condition Statements] (#3645)
[ISSUE-3087][Improve] Improve streampark-flink module based on [3.6 Control/Condition Statements] (#3645)
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index e268401..de9a75d 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -235,11 +235,10 @@
kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
- if (
- shutDownRequest.clusterId != null && kubeClientWrapper
- .getService(shutDownRequest.clusterId)
- .isPresent
- ) {
+ val stopAndCleanupState = shutDownRequest.clusterId != null && kubeClientWrapper
+ .getService(shutDownRequest.clusterId)
+ .isPresent
+ if (stopAndCleanupState) {
kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
ShutDownResponse(shutDownRequest.clusterId)
} else {
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index ec0a809..b2d3c84 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -144,13 +144,14 @@
flinkConfig: Configuration): SubmitResponse = {
var proxyUserUgi: UserGroupInformation = UserGroupInformation.getCurrentUser
val currentUser = UserGroupInformation.getCurrentUser
- if (!HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
- if (StringUtils.isNotEmpty(submitRequest.hadoopUser)) {
- proxyUserUgi = UserGroupInformation.createProxyUser(
- submitRequest.hadoopUser,
- currentUser
- )
- }
+ val eableProxyState =
+ !HadoopUtils.isKerberosSecurityEnabled(currentUser) && StringUtils.isNotEmpty(
+ submitRequest.hadoopUser)
+ if (eableProxyState) {
+ proxyUserUgi = UserGroupInformation.createProxyUser(
+ submitRequest.hadoopUser,
+ currentUser
+ )
}
proxyUserUgi.doAs[SubmitResponse](new PrivilegedAction[SubmitResponse] {
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index c91fc45..9aaa133 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -259,12 +259,11 @@
flinkConfig.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
val yarnClusterDescriptor = getSessionClusterDescriptor(flinkConfig)
clusterDescriptor = yarnClusterDescriptor._2
- if (
- FinalApplicationStatus.UNDEFINED.equals(
- clusterDescriptor.getYarnClient
- .getApplicationReport(ApplicationId.fromString(shutDownRequest.clusterId))
- .getFinalApplicationStatus)
- ) {
+ val shutDownState = FinalApplicationStatus.UNDEFINED.equals(
+ clusterDescriptor.getYarnClient
+ .getApplicationReport(ApplicationId.fromString(shutDownRequest.clusterId))
+ .getFinalApplicationStatus)
+ if (shutDownState) {
val clientProvider = clusterDescriptor.retrieve(yarnClusterDescriptor._1)
client = clientProvider.getClusterClient
client.shutDownCluster()
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index a31250d..5cf2a9d 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -136,10 +136,9 @@
flinkConfig.setBoolean(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
submitRequest.allowNonRestoredState)
- if (
- submitRequest.flinkVersion.checkVersion(
- FlinkRestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode != null
- ) {
+ val eableRestoreModeState = submitRequest.flinkVersion.checkVersion(
+ FlinkRestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode != null
+ if (eableRestoreModeState) {
flinkConfig.setString(FlinkRestoreMode.RESTORE_MODE, submitRequest.restoreMode.getName);
}
}
@@ -473,17 +472,17 @@
}
// execution.runtime-mode
- if (submitRequest.properties.nonEmpty) {
- if (submitRequest.properties.containsKey(ExecutionOptions.RUNTIME_MODE.key())) {
- programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
- programArgs += submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString
- }
+ val addRuntimeModeState =
+ submitRequest.properties.nonEmpty && submitRequest.properties.containsKey(
+ ExecutionOptions.RUNTIME_MODE.key())
+ if (addRuntimeModeState) {
+ programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
+ programArgs += submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString
}
- if (
- submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK
- && submitRequest.executionMode != FlinkExecutionMode.YARN_APPLICATION
- ) {
+ val addUserJarFileState =
+ submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK && submitRequest.executionMode != FlinkExecutionMode.YARN_APPLICATION
+ if (addUserJarFileState) {
// python file
programArgs.add("-py")
programArgs.add(submitRequest.userJarFile.getAbsolutePath)
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 49958a2..bbaf5e0 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -47,20 +47,20 @@
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
covertToServiceExposedType(submitRequest.k8sSubmitParam.flinkRestExposedType.get))
- if (submitRequest.buildResult != null) {
- if (submitRequest.executionMode == FlinkExecutionMode.KUBERNETES_NATIVE_APPLICATION) {
- val buildResult = submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse]
- buildResult.podTemplatePaths.foreach(
- p => {
- if (PodTemplateTool.KUBERNETES_POD_TEMPLATE.key.equals(p._1)) {
- flinkConfig.safeSet(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, p._2)
- } else if (PodTemplateTool.KUBERNETES_JM_POD_TEMPLATE.key.equals(p._1)) {
- flinkConfig.safeSet(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, p._2)
- } else if (PodTemplateTool.KUBERNETES_TM_POD_TEMPLATE.key.equals(p._1)) {
- flinkConfig.safeSet(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, p._2)
- }
- })
- }
+ val addBuildParamState =
+ submitRequest.buildResult != null && submitRequest.executionMode == FlinkExecutionMode.KUBERNETES_NATIVE_APPLICATION
+ if (addBuildParamState) {
+ val buildResult = submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse]
+ buildResult.podTemplatePaths.foreach(
+ p => {
+ if (PodTemplateTool.KUBERNETES_POD_TEMPLATE.key.equals(p._1)) {
+ flinkConfig.safeSet(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, p._2)
+ } else if (PodTemplateTool.KUBERNETES_JM_POD_TEMPLATE.key.equals(p._1)) {
+ flinkConfig.safeSet(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, p._2)
+ } else if (PodTemplateTool.KUBERNETES_TM_POD_TEMPLATE.key.equals(p._1)) {
+ flinkConfig.safeSet(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, p._2)
+ }
+ })
}
// add flink conf configuration, mainly to set the log4j configuration
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateParser.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateParser.scala
index e66e7fe..b515824 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateParser.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateParser.scala
@@ -70,10 +70,9 @@
}))
}
- if (
- root.containsKey("spec")
- && Try(!root.get("spec").asInstanceOf[JMap[String, Any]].isEmpty).getOrElse(false)
- ) {
+ val enableSpecState = root.containsKey("spec") && Try(
+ !root.get("spec").asInstanceOf[JMap[String, Any]].isEmpty).getOrElse(false)
+ if (enableSpecState) {
res.put("spec", root.get("spec"))
}
yaml.dumpAsMap(res)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 927ffc0..aa7d57b 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -115,9 +115,9 @@
case Some(jobState) =>
val trackId = id.copy(jobId = jobState.jobId)
val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
- if (
+ val updateState =
latest == null || latest.jobState != jobState.jobState || latest.jobId != jobState.jobId
- ) {
+ if (updateState) {
// put job status to cache
watchController.jobStatuses.put(trackId, jobState)
// set jobId to trackIds
@@ -327,9 +327,9 @@
pollEmitTime = pollEmitTime,
pollAckTime = System.currentTimeMillis)
- if (
+ val updateJobState =
jobState == FlinkJobStateEnum.SILENT && latest != null && latest.jobState == FlinkJobStateEnum.SILENT
- ) {
+ if (updateJobState) {
Some(jobStatusCV.copy(pollEmitTime = latest.pollEmitTime, pollAckTime = latest.pollAckTime))
} else {
Some(jobStatusCV)
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index c424b3b..186d444 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -65,11 +65,11 @@
"central",
Constant.DEFAULT,
InternalConfigHolder.get(MAVEN_REMOTE_URL))
- val remoteRepository =
- if (
+ val remoteRepository = {
+ val buildState =
InternalConfigHolder.get(MAVEN_AUTH_USER) == null || InternalConfigHolder.get(
MAVEN_AUTH_PASSWORD) == null
- ) {
+ if (buildState) {
builder.build()
} else {
val authentication = new AuthenticationBuilder()
@@ -78,6 +78,7 @@
.build()
builder.setAuthentication(authentication).build()
}
+ }
List(remoteRepository)
}
@@ -253,11 +254,11 @@
override def canFilter(jar: File): Boolean = true
override def isFiltered(name: String): Boolean = {
- if (name.startsWith("META-INF/")) {
- if (name.endsWith(".SF") || name.endsWith(".DSA") || name.endsWith(".RSA")) {
- logInfo(s"shade ignore file: $name")
- return true
- }
+ val isFilteredState = name.startsWith("META-INF/") && name.endsWith(".SF") || name.endsWith(
+ ".DSA") || name.endsWith(".RSA")
+ if (isFilteredState) {
+ logInfo(s"shade ignore file: $name")
+ return true
}
false
}
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
index eb9b402..9c8e944 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
@@ -37,10 +37,10 @@
var lastTime: Long) {
// noinspection DuplicatedCode
def update(pullRsp: PullResponseItem): Unit = {
- if (
+ val nonPullUpdateState =
pullRsp == null || StringUtils.isBlank(pullRsp.getId) || StringUtils.isBlank(
pullRsp.getStatus)
- ) {
+ if (nonPullUpdateState) {
return
}
if (pullRsp.getStatus.contains("complete")) {
@@ -79,10 +79,10 @@
var lastTime: Long) {
// noinspection DuplicatedCode
def update(pushRsp: PushResponseItem): Unit = {
- if (
+ val nonPushUpdateState =
pushRsp == null || StringUtils.isBlank(pushRsp.getId) || StringUtils.isBlank(
pushRsp.getStatus)
- ) {
+ if (nonPushUpdateState) {
return
}
if (pushRsp.getStatus.contains("complete")) {
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index ba8af63..25849fa 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -138,10 +138,9 @@
val pullImageCmd = {
// when the register address prefix is explicitly identified on base image tag,
// the user's pre-saved docker register auth info would be used.
- if (
- dockerConf.registerAddress != null && !baseImageTag.startsWith(
- dockerConf.registerAddress)
- ) {
+ val pullImageCmdState = dockerConf.registerAddress != null && !baseImageTag.startsWith(
+ dockerConf.registerAddress)
+ if (pullImageCmdState) {
dockerClient.pullImageCmd(baseImageTag)
} else {
dockerClient.pullImageCmd(baseImageTag).withAuthConfig(dockerConf.toAuthConf)
@@ -229,7 +228,9 @@
registerAddress: String,
imageNamespace: String): String = {
var tagName = if (tag.contains("/")) tag else s"$imageNamespace/$tag"
- if (StringUtils.isNotBlank(registerAddress) && !tagName.startsWith(registerAddress)) {
+ val addRegisterAddressState =
+ StringUtils.isNotBlank(registerAddress) && !tagName.startsWith(registerAddress)
+ if (addRegisterAddressState) {
tagName = s"$registerAddress/$tagName"
}
tagName.toLowerCase
diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
index 7db1fbe..b6e9d7b 100644
--- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
+++ b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
@@ -101,9 +101,9 @@
if (urlClassLoaderResource != null && JAR_PROTOCOL == urlClassLoaderResource.getProtocol) {
val spec = urlClassLoaderResource.getFile
val filename = new File(spec.substring(0, spec.indexOf("!/"))).getName
- if (
+ val matchState =
FLINK_PATTERN.matcher(filename).matches && !flinkResourcePattern.matcher(filename).matches
- ) {
+ if (matchState) {
return null
}
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index 12a2fdc..19ced0e 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -641,15 +641,13 @@
private[this] def isSingleLineComment(curChar: Char, nextChar: Char): Boolean = {
var flag = false
for (singleCommentPrefix <- singleLineCommentPrefixList) {
- if (singleCommentPrefix.length == 1) {
- if (curChar == singleCommentPrefix.charAt(0)) {
+ singleCommentPrefix.length match {
+ case 1 if curChar == singleCommentPrefix.charAt(0) => flag = true
+ case 2
+ if curChar == singleCommentPrefix.charAt(0) && nextChar == singleCommentPrefix.charAt(
+ 1) =>
flag = true
- }
- }
- if (singleCommentPrefix.length == 2) {
- if (curChar == singleCommentPrefix.charAt(0) && nextChar == singleCommentPrefix.charAt(1)) {
- flag = true
- }
+ case _ =>
}
}
flag