| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.streampark.flink.client.bean |
| |
| import org.apache.streampark.common.Constant |
| import org.apache.streampark.common.conf.{FlinkVersion, Workspace} |
| import org.apache.streampark.common.conf.ConfigKeys._ |
| import org.apache.streampark.common.enums._ |
| import org.apache.streampark.common.util.{AssertUtils, DeflaterUtils, HdfsUtils, PropertiesUtils} |
| import org.apache.streampark.flink.packer.pipeline.{BuildResult, ShadedBuildResponse} |
| import org.apache.streampark.flink.util.FlinkUtils |
| import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper |
| |
| import org.apache.commons.io.FileUtils |
| import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, SavepointRestoreSettings} |
| |
| import javax.annotation.Nullable |
| |
| import java.io.File |
| import java.net.URL |
| import java.util.{Map => JavaMap} |
| |
| import scala.collection.convert.ImplicitConversions._ |
| import scala.util.Try |
| |
| case class SubmitRequest( |
| flinkVersion: FlinkVersion, |
| executionMode: FlinkExecutionMode, |
| properties: JavaMap[String, Any], |
| flinkYaml: String, |
| developmentMode: FlinkDevelopmentMode, |
| id: Long, |
| jobId: String, |
| appName: String, |
| appConf: String, |
| applicationType: ApplicationType, |
| savePoint: String, |
| restoreMode: FlinkRestoreMode, |
| args: String, |
| @Nullable hadoopUser: String, |
| @Nullable buildResult: BuildResult, |
| @Nullable k8sSubmitParam: KubernetesSubmitParam, |
| @Nullable extraParameter: JavaMap[String, Any]) { |
| |
| private[this] lazy val appProperties: Map[String, String] = getParameterMap( |
| KEY_FLINK_PROPERTY_PREFIX) |
| |
| lazy val appOption: Map[String, String] = getParameterMap(KEY_FLINK_OPTION_PREFIX) |
| |
| lazy val appMain: String = this.developmentMode match { |
| case FlinkDevelopmentMode.FLINK_SQL => Constant.STREAMPARK_FLINKSQL_CLIENT_CLASS |
| case FlinkDevelopmentMode.PYFLINK => Constant.PYTHON_FLINK_DRIVER_CLASS_NAME |
| case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS) |
| } |
| |
| lazy val effectiveAppName: String = |
| if (this.appName == null) appProperties(KEY_FLINK_APP_NAME) else this.appName |
| |
| lazy val libs: List[URL] = { |
| val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib" |
| Try(new File(path).listFiles().map(_.toURI.toURL).toList).getOrElse(List.empty[URL]) |
| } |
| |
| lazy val classPaths: List[URL] = flinkVersion.flinkLibs ++ libs |
| |
| lazy val flinkSQL: String = extraParameter.get(KEY_FLINK_SQL()).toString |
| |
| lazy val allowNonRestoredState: Boolean = Try( |
| properties.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean) |
| .getOrElse(false) |
| |
| lazy val savepointRestoreSettings: SavepointRestoreSettings = { |
| savePoint match { |
| case sp if Try(sp.isEmpty).getOrElse(true) => SavepointRestoreSettings.none |
| case sp => SavepointRestoreSettings.forPath(sp, allowNonRestoredState) |
| } |
| } |
| |
| lazy val userJarFile: File = { |
| executionMode match { |
| case FlinkExecutionMode.KUBERNETES_NATIVE_APPLICATION => null |
| case _ => |
| checkBuildResult() |
| new File(buildResult.asInstanceOf[ShadedBuildResponse].shadedJarPath) |
| } |
| } |
| |
| lazy val safePackageProgram: Boolean = { |
| // ref FLINK-21164 FLINK-9844 packageProgram.close() |
| // must be flink 1.12.2 and above |
| flinkVersion.version.split("\\.").map(_.trim.toInt) match { |
| case Array(a, b, c) if a >= 1 => b > 12 || (b == 12 && c >= 2) |
| case _ => false |
| } |
| } |
| |
| private[this] def getParameterMap(prefix: String = ""): Map[String, String] = { |
| if (this.appConf == null) { |
| return Map.empty[String, String] |
| } |
| val format = this.appConf.substring(0, 7) |
| if (format == "json://") { |
| val json = this.appConf.drop(7) |
| new ObjectMapper() |
| .readValue[JavaMap[String, String]](json, classOf[JavaMap[String, String]]) |
| .toMap |
| .filter(_._2 != null) |
| } else { |
| lazy val content = DeflaterUtils.unzipString(this.appConf.trim.drop(7)) |
| val map = format match { |
| case "yaml://" => PropertiesUtils.fromYamlText(content) |
| case "conf://" => PropertiesUtils.fromHoconText(content) |
| case "prop://" => PropertiesUtils.fromPropertiesText(content) |
| case "hdfs://" => |
| /** |
| * If the configuration file is HDFS mode, you need to copy the HDFS related configuration |
| * file to resources. |
| */ |
| val text = HdfsUtils.read(this.appConf) |
| val extension = this.appConf.split("\\.").last.toLowerCase |
| extension match { |
| case "yml" | "yaml" => PropertiesUtils.fromYamlText(text) |
| case "conf" => PropertiesUtils.fromHoconText(text) |
| case "properties" => PropertiesUtils.fromPropertiesText(text) |
| case _ => |
| throw new IllegalArgumentException( |
| "[StreamPark] Usage: application config format error,must be [yaml|conf|properties]") |
| } |
| case _ => |
| throw new IllegalArgumentException("[StreamPark] application config format error.") |
| } |
| map |
| .filter(_._1.startsWith(prefix)) |
| .filter(_._2.nonEmpty) |
| .map(x => x._1.drop(prefix.length) -> x._2) |
| } |
| } |
| |
| private[client] lazy val hdfsWorkspace = { |
| |
| /** |
| * The flink version and configuration in the native flink and hdfs must be kept exactly the |
| * same. |
| */ |
| val workspace = Workspace.remote |
| val flinkHome = flinkVersion.flinkHome |
| val flinkHomeDir = new File(flinkHome) |
| val flinkName = if (FileUtils.isSymlink(flinkHomeDir)) { |
| flinkHomeDir.getCanonicalFile.getName |
| } else { |
| flinkHomeDir.getName |
| } |
| val flinkHdfsHome = s"${workspace.APP_FLINK}/$flinkName" |
| HdfsWorkspace( |
| flinkName, |
| flinkHome, |
| flinkLib = s"$flinkHdfsHome/lib", |
| flinkPlugins = s"$flinkHdfsHome/plugins", |
| flinkDistJar = FlinkUtils.getFlinkDistJar(flinkHome), |
| appJars = workspace.APP_JARS, |
| appPlugins = workspace.APP_PLUGINS |
| ) |
| } |
| |
| @throws[Exception] |
| def checkBuildResult(): Unit = { |
| executionMode match { |
| case FlinkExecutionMode.KUBERNETES_NATIVE_SESSION => |
| AssertUtils.required( |
| buildResult != null, |
| s"[flink-submit] current job: ${this.effectiveAppName} was not yet built, buildResult is empty" + |
| s",clusterId=${k8sSubmitParam.clusterId}," + |
| s",namespace=${k8sSubmitParam.kubernetesNamespace}" |
| ) |
| AssertUtils.required( |
| buildResult.pass, |
| s"[flink-submit] current job ${this.effectiveAppName} build failed, clusterId" + |
| s",clusterId=${k8sSubmitParam.clusterId}," + |
| s",namespace=${k8sSubmitParam.kubernetesNamespace}" |
| ) |
| case _ => |
| AssertUtils.required( |
| this.buildResult != null, |
| s"[flink-submit] current job: ${this.effectiveAppName} was not yet built, buildResult is empty") |
| AssertUtils.required( |
| this.buildResult.pass, |
| s"[flink-submit] current job ${this.effectiveAppName} build failed, please check") |
| } |
| } |
| |
| } |
| |
| /** |
| * @param flinkName |
| * @param flinkHome |
| * @param flinkDistJar |
| * @param flinkLib |
| * @param flinkPlugins |
| * @param appJars |
| * @param appPlugins |
| */ |
| case class HdfsWorkspace( |
| flinkName: String, |
| flinkHome: String, |
| flinkDistJar: String, |
| flinkLib: String, |
| flinkPlugins: String, |
| appJars: String, |
| appPlugins: String) |