blob: 916096f2595bb6d08fc14fa80214e20a26f45d27 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.flink.kubernetes
import org.apache.streampark.common.conf.ConfigKeys
import org.apache.streampark.common.util.ImplicitsUtils._
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteModeEnum
import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.kubernetes.model.ClusterKey
import io.fabric8.kubernetes.client.{DefaultKubernetesClient, KubernetesClient, KubernetesClientException}
import org.apache.flink.client.cli.ClientOptions
import org.apache.flink.client.deployment.{ClusterClientFactory, DefaultClusterClientServiceLoader}
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{Configuration, DeploymentOptions, RestOptions}
import org.apache.flink.kubernetes.KubernetesClusterDescriptor
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
import org.apache.hc.core5.util.Timeout
import javax.annotation.Nullable
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
object KubernetesRetriever extends Logger {
// see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(ClientOptions.CLIENT_TIMEOUT.defaultValue().toMillis)
// see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
/** get new KubernetesClient */
@throws(classOf[KubernetesClientException])
def newK8sClient(): KubernetesClient = {
new DefaultKubernetesClient()
}
/** check connection of kubernetes cluster */
def checkK8sConnection(): Boolean = {
Try(newK8sClient().getVersion != null).getOrElse(false)
}
private val clusterClientServiceLoader = new DefaultClusterClientServiceLoader()
/** get new flink cluster client of kubernetes mode */
def newFinkClusterClient(
clusterId: String,
@Nullable namespace: String,
executeMode: FlinkK8sExecuteModeEnum.Value): Option[ClusterClient[String]] = {
// build flink config
val flinkConfig = new Configuration()
flinkConfig.setString(DeploymentOptions.TARGET, executeMode.toString)
flinkConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId)
flinkConfig.set(ClientOptions.CLIENT_TIMEOUT, ClientOptions.CLIENT_TIMEOUT.defaultValue())
flinkConfig.set(
RestOptions.AWAIT_LEADER_TIMEOUT,
RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
flinkConfig.set(RestOptions.RETRY_MAX_ATTEMPTS, RestOptions.RETRY_MAX_ATTEMPTS.defaultValue())
if (Try(namespace.isEmpty).getOrElse(true)) {
flinkConfig.setString(
KubernetesConfigOptions.NAMESPACE,
KubernetesConfigOptions.NAMESPACE.defaultValue())
} else {
flinkConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace)
}
// retrieve flink cluster client
val clientFactory: ClusterClientFactory[String] =
clusterClientServiceLoader.getClusterClientFactory(flinkConfig)
val clusterProvider: KubernetesClusterDescriptor =
clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[KubernetesClusterDescriptor]
Try {
clusterProvider
.retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
.getClusterClient
} match {
case Success(v) => Some(v)
case Failure(e) =>
logError(s"Get flinkClient error, the error is: $e")
None
}
}
/**
* check whether deployment exists on kubernetes cluster
*
* @param name
* deployment name
* @param namespace
* deployment namespace
*/
def isDeploymentExists(name: String, namespace: String): Boolean = {
KubernetesRetriever
.newK8sClient()
.autoClose(
client =>
client
.apps()
.deployments()
.inNamespace(namespace)
.withLabel("type", ConfigKeys.FLINK_NATIVE_KUBERNETES_LABEL)
.list()
.getItems
.asScala
.exists(e => e.getMetadata.getName == name))(_ => false)
}
/** retrieve flink jobManager rest url */
def retrieveFlinkRestUrl(clusterKey: ClusterKey): Option[String] = {
KubernetesRetriever
.newFinkClusterClient(clusterKey.clusterId, clusterKey.namespace, clusterKey.executeMode)
.getOrElse(return None)
.autoClose(
client => {
val url =
IngressController.ingressUrlAddress(clusterKey.namespace, clusterKey.clusterId, client)
logger.info(s"retrieve flink jobManager rest url: $url")
Some(url)
})
}
}