blob: de9a75dc69d666d3e812dc47ced097b2b7e9e4a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.flink.client.impl
import org.apache.streampark.common.enums.FlinkExecutionMode
import org.apache.streampark.common.util.{Logger, Utils}
import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
import org.apache.streampark.flink.core.FlinkKubernetesClient
import org.apache.streampark.flink.kubernetes.KubernetesRetriever
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteModeEnum
import org.apache.streampark.flink.kubernetes.model.ClusterKey
import io.fabric8.kubernetes.api.model.{Config => _}
import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
import org.apache.flink.configuration._
import org.apache.flink.kubernetes.KubernetesClusterDescriptor
import org.apache.flink.kubernetes.configuration.{KubernetesConfigOptions, KubernetesDeploymentTarget}
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.ServiceExposedType
import org.apache.flink.kubernetes.kubeclient.{FlinkKubeClient, FlinkKubeClientFactory}
import scala.collection.convert.ImplicitConversions._
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
/**
* Kubernetes native session mode submit.
* @deprecated
* Please use [[KubernetesSessionClientV2]] instead.
*/
@Deprecated
object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with Logger {
@throws[Exception]
override def doSubmit(
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
// require parameters
require(
StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId),
s"[flink-submit] submit flink job failed, clusterId is null, mode=${flinkConfig.get(DeploymentOptions.TARGET)}"
)
super.trySubmit(submitRequest, flinkConfig)(jobGraphSubmit, restApiSubmit)
}
/** Submit flink session job via rest api. */
@throws[Exception]
def restApiSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration): SubmitResponse = {
Try {
// get jm rest url of flink session cluster
val clusterKey = ClusterKey(
FlinkK8sExecuteModeEnum.SESSION,
submitRequest.k8sSubmitParam.kubernetesNamespace,
submitRequest.k8sSubmitParam.clusterId)
val jmRestUrl = KubernetesRetriever
.retrieveFlinkRestUrl(clusterKey)
.getOrElse(throw new Exception(
s"[flink-submit] retrieve flink session rest url failed, clusterKey=$clusterKey"))
// submit job via rest api
val jobId =
FlinkSessionSubmitHelper.submitViaRestApi(jmRestUrl, submitRequest.userJarFile, flinkConfig)
SubmitResponse(clusterKey.clusterId, flinkConfig.toMap, jobId, jmRestUrl)
} match {
case Success(s) => s
case Failure(e) =>
logError(s"submit flink job fail in ${submitRequest.executionMode} mode")
throw e
}
}
/** Submit flink session job with building JobGraph via ClusterClient api. */
@throws[Exception]
def jobGraphSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration): SubmitResponse = {
// retrieve k8s cluster and submit flink job on session mode
var clusterDescriptor: KubernetesClusterDescriptor = null
var packageProgram: PackagedProgram = null
var client: ClusterClient[String] = null
try {
clusterDescriptor = getK8sClusterDescriptor(flinkConfig)
// build JobGraph
val programJobGraph = super.getJobGraph(submitRequest, flinkConfig)
packageProgram = programJobGraph._1
val jobGraph = programJobGraph._2
// retrieve client and submit JobGraph
client = clusterDescriptor
.retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
.getClusterClient
val submitResult = client.submitJob(jobGraph)
val jobId = submitResult.get().toString
val result =
SubmitResponse(client.getClusterId, flinkConfig.toMap, jobId, client.getWebInterfaceURL)
logInfo(
s"[flink-submit] flink job has been submitted. ${flinkConfIdentifierInfo(flinkConfig)}, jobId: $jobId")
result
} catch {
case e: Exception =>
logError(s"submit flink job fail in ${submitRequest.executionMode} mode")
e.printStackTrace()
throw e
} finally {
if (submitRequest.safePackageProgram) {
Utils.close(packageProgram)
}
Utils.close(clusterDescriptor, client)
}
}
override def doCancel(
cancelRequest: CancelRequest,
flinkConfig: Configuration): CancelResponse = {
flinkConfig.safeSet(
DeploymentOptions.TARGET,
FlinkExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
super.doCancel(cancelRequest, flinkConfig)
}
def deploy(deployRequest: DeployRequest): DeployResponse = {
logInfo(
s"""
|--------------------------------------- kubernetes cluster start ---------------------------------------
| userFlinkHome : ${deployRequest.flinkVersion.flinkHome}
| flinkVersion : ${deployRequest.flinkVersion.version}
| execMode : ${deployRequest.executionMode.name()}
| clusterId : ${deployRequest.clusterId}
| namespace : ${deployRequest.k8sDeployParam.kubernetesNamespace}
| exposedType : ${deployRequest.k8sDeployParam.flinkRestExposedType}
| serviceAccount : ${deployRequest.k8sDeployParam.serviceAccount}
| flinkImage : ${deployRequest.k8sDeployParam.flinkImage}
| properties : ${deployRequest.properties.mkString(",")}
|--------------------------------------------------------------------------------------------------------
|""".stripMargin)
var clusterDescriptor: KubernetesClusterDescriptor = null
var client: ClusterClient[String] = null
var kubeClient: FlinkKubeClient = null
try {
val flinkConfig =
extractConfiguration(deployRequest.flinkVersion.flinkHome, deployRequest.properties)
flinkConfig
.safeSet(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName)
.safeSet(
KubernetesConfigOptions.NAMESPACE,
deployRequest.k8sDeployParam.kubernetesNamespace)
.safeSet(
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
deployRequest.k8sDeployParam.serviceAccount)
.safeSet(
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
ServiceExposedType.valueOf(deployRequest.k8sDeployParam.flinkRestExposedType.getName))
.safeSet(KubernetesConfigOptions.CLUSTER_ID, deployRequest.clusterId)
.safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, deployRequest.k8sDeployParam.flinkImage)
.safeSet(
KubernetesConfigOptions.KUBE_CONFIG_FILE,
getDefaultKubernetesConf(deployRequest.k8sDeployParam.kubeConf))
.safeSet(
DeploymentOptionsInternal.CONF_DIR,
s"${deployRequest.flinkVersion.flinkHome}/conf")
val kubernetesClusterDescriptor = getK8sClusterDescriptorAndSpecification(flinkConfig)
clusterDescriptor = kubernetesClusterDescriptor._1
kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
if (
deployRequest.clusterId != null && kubeClientWrapper
.getService(deployRequest.clusterId)
.isPresent
) {
client = clusterDescriptor.retrieve(deployRequest.clusterId).getClusterClient
} else {
client =
clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
}
if (client.getWebInterfaceURL != null) {
DeployResponse(client.getWebInterfaceURL, client.getClusterId)
} else {
null
}
} catch {
case e: Exception =>
logError(s"start flink session fail in ${deployRequest.executionMode} mode")
e.printStackTrace()
throw e
} finally {
Utils.close(client, clusterDescriptor, kubeClient)
}
}
def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
var kubeClient: FlinkKubeClient = null
try {
val flinkConfig = getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
shutDownRequest.properties.foreach(
m =>
m._2 match {
case v if v != null => flinkConfig.setString(m._1, m._2.toString)
case _ =>
})
flinkConfig
.safeSet(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName)
.safeSet(
KubernetesConfigOptions.NAMESPACE,
shutDownRequest.kubernetesDeployParam.kubernetesNamespace)
.safeSet(
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
shutDownRequest.kubernetesDeployParam.serviceAccount)
.safeSet(
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
ServiceExposedType.valueOf(
shutDownRequest.kubernetesDeployParam.flinkRestExposedType.getName))
.safeSet(KubernetesConfigOptions.CLUSTER_ID, shutDownRequest.clusterId)
.safeSet(
KubernetesConfigOptions.CONTAINER_IMAGE,
shutDownRequest.kubernetesDeployParam.flinkImage)
.safeSet(
KubernetesConfigOptions.KUBE_CONFIG_FILE,
getDefaultKubernetesConf(shutDownRequest.kubernetesDeployParam.kubeConf))
kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
val stopAndCleanupState = shutDownRequest.clusterId != null && kubeClientWrapper
.getService(shutDownRequest.clusterId)
.isPresent
if (stopAndCleanupState) {
kubeClient.stopAndCleanupCluster(shutDownRequest.clusterId)
ShutDownResponse(shutDownRequest.clusterId)
} else {
null
}
} catch {
case e: Exception =>
logError(s"shutdown flink session fail in ${shutDownRequest.executionMode} mode")
e.printStackTrace()
throw e
} finally {
Utils.close(kubeClient)
}
}
override def doTriggerSavepoint(
triggerSavepointRequest: TriggerSavepointRequest,
flinkConfig: Configuration): SavepointResponse = {
flinkConfig.safeSet(
DeploymentOptions.TARGET,
FlinkExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
super.doTriggerSavepoint(triggerSavepointRequest, flinkConfig)
}
}