blob: ec0a8091bf36e2467bfa1261708849fce21257c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.flink.client.impl
import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.FlinkDevelopmentMode
import org.apache.streampark.common.fs.FsOperator
import org.apache.streampark.common.util.{FileUtils, HdfsUtils, Utils}
import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration._
import org.apache.flink.python.PythonOptions
import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
import org.apache.flink.runtime.util.HadoopUtils
import org.apache.flink.yarn.configuration.YarnConfigOptions
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId
import java.security.PrivilegedAction
import java.util
import java.util.Collections
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
/** yarn application mode submit */
object YarnApplicationClient extends YarnClientTrait {
private[this] lazy val workspace = Workspace.remote
override def setConfig(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = {
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
submitRequest.flinkVersion.flinkHome)
val currentUser = UserGroupInformation.getCurrentUser
logDebug(s"UserGroupInformation currentUser: $currentUser")
if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
logDebug(s"kerberos Security is Enabled...")
val useTicketCache =
flinkDefaultConfiguration.get(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE)
if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
throw new RuntimeException(
s"Hadoop security with Kerberos is enabled but the login user $currentUser does not have Kerberos credentials or delegation tokens!")
}
}
val providedLibs = {
val array = ListBuffer(
submitRequest.hdfsWorkspace.flinkLib,
submitRequest.hdfsWorkspace.flinkPlugins,
submitRequest.hdfsWorkspace.appJars,
submitRequest.hdfsWorkspace.appPlugins
)
submitRequest.developmentMode match {
case FlinkDevelopmentMode.FLINK_SQL =>
array += s"${workspace.APP_SHIMS}/flink-${submitRequest.flinkVersion.majorVersion}"
val jobLib = s"${workspace.APP_WORKSPACE}/${submitRequest.id}/lib"
if (HdfsUtils.exists(jobLib)) {
array += jobLib
}
case _ =>
}
array.toList
}
flinkConfig
// yarn.provided.lib.dirs
.safeSet(YarnConfigOptions.PROVIDED_LIB_DIRS, providedLibs.asJava)
// flinkDistJar
.safeSet(YarnConfigOptions.FLINK_DIST_JAR, submitRequest.hdfsWorkspace.flinkDistJar)
// pipeline.jars
.safeSet(
PipelineOptions.JARS,
Collections.singletonList(
submitRequest.buildResult.asInstanceOf[ShadedBuildResponse].shadedJarPath))
// yarn application name
.safeSet(YarnConfigOptions.APPLICATION_NAME, submitRequest.effectiveAppName)
// yarn application Type
.safeSet(YarnConfigOptions.APPLICATION_TYPE, submitRequest.applicationType.getName)
if (submitRequest.developmentMode == FlinkDevelopmentMode.PYFLINK) {
val pyVenv: String = workspace.APP_PYTHON_VENV
if (!FsOperator.hdfs.exists(pyVenv)) {
throw new RuntimeException(s"$pyVenv File does not exist")
}
val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib"
if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) {
flinkConfig.safeSet(PipelineOptions.JARS, util.Arrays.asList(localLib))
}
// yarn.ship-files
val shipFiles = new util.ArrayList[String]()
shipFiles.add(submitRequest.userJarFile.getParentFile.getAbsolutePath)
flinkConfig
.safeSet(YarnConfigOptions.SHIP_FILES, shipFiles)
// python.files
.safeSet(PythonOptions.PYTHON_FILES, submitRequest.userJarFile.getParentFile.getName)
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pyVenv)
// python.client.executable
.safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
// python.executable
.safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
val args: util.List[String] = flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS)
// Caused by: java.lang.UnsupportedOperationException
val argsList: util.ArrayList[String] = new util.ArrayList[String](args)
argsList.add("-pym")
argsList.add(submitRequest.userJarFile.getName.dropRight(Constant.PYTHON_SUFFIX.length))
flinkConfig.safeSet(ApplicationConfiguration.APPLICATION_ARGS, argsList)
}
logInfo(s"""
|------------------------------------------------------------------
|Effective submit configuration: $flinkConfig
|------------------------------------------------------------------
|""".stripMargin)
}
override def doSubmit(
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
var proxyUserUgi: UserGroupInformation = UserGroupInformation.getCurrentUser
val currentUser = UserGroupInformation.getCurrentUser
if (!HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
if (StringUtils.isNotEmpty(submitRequest.hadoopUser)) {
proxyUserUgi = UserGroupInformation.createProxyUser(
submitRequest.hadoopUser,
currentUser
)
}
}
proxyUserUgi.doAs[SubmitResponse](new PrivilegedAction[SubmitResponse] {
override def run(): SubmitResponse = {
val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
val clientFactory =
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
val clusterDescriptor = clientFactory.createClusterDescriptor(flinkConfig)
var clusterClient: ClusterClient[ApplicationId] = null
try {
val clusterSpecification = clientFactory.getClusterSpecification(flinkConfig)
logInfo(s"""
|------------------------<<specification>>-------------------------
|$clusterSpecification
|------------------------------------------------------------------
|""".stripMargin)
val applicationConfiguration = ApplicationConfiguration.fromConfiguration(flinkConfig)
var applicationId: ApplicationId = null
var jobManagerUrl: String = null
clusterClient = clusterDescriptor
.deployApplicationCluster(clusterSpecification, applicationConfiguration)
.getClusterClient
applicationId = clusterClient.getClusterId
jobManagerUrl = clusterClient.getWebInterfaceURL
logInfo(s"""
|-------------------------<<applicationId>>------------------------
|Flink Job Started: applicationId: $applicationId
|__________________________________________________________________
|""".stripMargin)
SubmitResponse(applicationId.toString, flinkConfig.toMap, jobManagerUrl = jobManagerUrl)
} finally {
Utils.close(clusterDescriptor, clusterClient)
}
}
})
// SecurityUtils.install(new SecurityConfiguration(flinkConfig))
// SecurityUtils.getInstalledContext.runSecured(
// () => {
// val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
// val clientFactory =
// clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
// val clusterDescriptor = clientFactory.createClusterDescriptor(flinkConfig)
// var clusterClient: ClusterClient[ApplicationId] = null
// try {
// val clusterSpecification = clientFactory.getClusterSpecification(flinkConfig)
// logInfo(s"""
// |------------------------<<specification>>-------------------------
// |$clusterSpecification
// |------------------------------------------------------------------
// |""".stripMargin)
//
// val applicationConfiguration = ApplicationConfiguration.fromConfiguration(flinkConfig)
// var applicationId: ApplicationId = null
// var jobManagerUrl: String = null
// clusterClient = clusterDescriptor
// .deployApplicationCluster(clusterSpecification, applicationConfiguration)
// .getClusterClient
// applicationId = clusterClient.getClusterId
// jobManagerUrl = clusterClient.getWebInterfaceURL
// logInfo(s"""
// |-------------------------<<applicationId>>------------------------
// |Flink Job Started: applicationId: $applicationId
// |__________________________________________________________________
// |""".stripMargin)
//
// SubmitResponse(applicationId.toString, flinkConfig.toMap, jobManagerUrl = jobManagerUrl)
// } finally {
// Utils.close(clusterDescriptor, clusterClient)
// }
// })
}
}