blob: 00242aff5a1cb0c3c4b17b8ec68aaacb3760791c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.flink.kubernetes.watcher
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, JobStatusWatcherConfig, KubernetesRetriever}
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
import org.apache.streampark.flink.kubernetes.model._
import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.history.FsJobArchivist
import org.apache.hc.client5.http.fluent.Request
import org.apache.hc.core5.util.Timeout
import org.json4s.{DefaultFormats, JNothing, JNull}
import org.json4s.JsonAST.JArray
import org.json4s.jackson.JsonMethods.parse
import javax.annotation.Nonnull
import javax.annotation.concurrent.ThreadSafe
import java.io.File
import java.nio.charset.StandardCharsets
import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
import scala.collection.JavaConversions._
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
import scala.concurrent.duration.DurationLong
import scala.language.{implicitConversions, postfixOps}
import scala.util.{Failure, Success, Try}
/**
* Watcher for continuously monitor flink job status on kubernetes-mode, the traced flink
* identifiers from FlinkTrackCachePool.trackIds, the traced result of flink jobs status would
* written to FlinkTrackCachePool.jobStatuses.
*/
@ThreadSafe
class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfig.defaultConf)(
implicit val watchController: FlinkK8sWatchController,
implicit val eventBus: ChangeEventBus)
extends Logger
with FlinkWatcher {
private val trackTaskExecPool = Executors.newWorkStealingPool()
implicit private val trackTaskExecutor: ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(trackTaskExecPool)
private val timerExec = Executors.newSingleThreadScheduledExecutor()
private var timerSchedule: ScheduledFuture[_] = _
/** start watcher process */
override def doStart(): Unit = {
timerSchedule =
timerExec.scheduleAtFixedRate(() => doWatch(), 0, conf.requestIntervalSec, TimeUnit.SECONDS)
logInfo("[flink-k8s] FlinkJobStatusWatcher started.")
}
/** stop watcher process */
override def doStop(): Unit = {
// interrupt all running threads
timerSchedule.cancel(true)
logInfo("[flink-k8s] FlinkJobStatusWatcher stopped.")
}
/** closes resource, relinquishing any underlying resources. */
override def doClose(): Unit = {
timerExec.shutdownNow()
trackTaskExecutor.shutdownNow()
logInfo("[flink-k8s] FlinkJobStatusWatcher closed.")
}
/** single flink job status tracking task */
override def doWatch(): Unit = {
this.synchronized {
logDebug(
"[FlinkJobStatusWatcher]: Status monitoring process begins - " + Thread
.currentThread()
.getName)
// get all legal tracking ids
val trackIds = Try(watchController.getAllWatchingIds())
.filter(_.nonEmpty)
.getOrElse(return
)
// retrieve flink job status in thread pool
val tracksFuture: Set[Future[Option[JobStatusCV]]] = trackIds.map {
id =>
val future = Future {
id.executeMode match {
case SESSION => touchSessionJob(id)
case APPLICATION => touchApplicationJob(id)
}
}
future.onComplete(_.getOrElse(None) match {
case Some(jobState) =>
val trackId = id.copy(jobId = jobState.jobId)
val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
if (
latest == null || latest.jobState != jobState.jobState || latest.jobId != jobState.jobId
) {
// put job status to cache
watchController.jobStatuses.put(trackId, jobState)
// set jobId to trackIds
watchController.trackIds.update(trackId)
eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
}
if (FlinkJobState.isEndState(jobState.jobState)) {
// remove trackId from cache of job that needs to be untracked
watchController.unWatching(trackId)
if (trackId.executeMode == APPLICATION) {
watchController.endpoints.invalidate(trackId.toClusterKey)
}
}
case _ =>
})
future
}
// blocking until all future are completed or timeout is reached
Try(Await.ready(Future.sequence(tracksFuture), conf.requestTimeoutSec seconds)).failed.map {
_ =>
logWarn(
s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes mode timeout," +
s" limitSeconds=${conf.requestTimeoutSec}," +
s" trackIds=${trackIds.mkString(",")}")
}
logDebug(
"[FlinkJobStatusWatcher]: End of status monitoring process - " + Thread
.currentThread()
.getName)
}
}
/**
* Get flink status information from kubernetes-native-session cluster. When the
* flink-cluster-client request fails, the job state would be LOST or SILENT.
*
* This method can be called directly from outside, without affecting the current cachePool
* result.
*/
def touchSessionJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
val pollEmitTime = System.currentTimeMillis
val clusterId = trackId.clusterId
val namespace = trackId.namespace
val appId = trackId.appId
val jobId = trackId.jobId
val rsMap = touchSessionAllJob(clusterId, namespace, appId, trackId.groupId).toMap
val id = TrackId.onSession(namespace, clusterId, appId, jobId, trackId.groupId)
val jobState = rsMap.get(id).filter(_.jobState != FlinkJobState.SILENT).getOrElse {
val preCache = watchController.jobStatuses.get(id)
val state = inferSilentOrLostFromPreCache(preCache)
val nonFirstSilent =
state == FlinkJobState.SILENT && preCache != null && preCache.jobState == FlinkJobState.SILENT
if (nonFirstSilent) {
JobStatusCV(
jobState = state,
jobId = id.jobId,
pollEmitTime = preCache.pollEmitTime,
pollAckTime = preCache.pollAckTime)
} else {
JobStatusCV(
jobState = state,
jobId = id.jobId,
pollEmitTime = pollEmitTime,
pollAckTime = System.currentTimeMillis)
}
}
Some(jobState)
}
/**
* Get all flink job status information from kubernetes-native-session cluster. The empty array
* will returned when the k8s-client or flink-cluster-client request fails.
*
* This method can be called directly from outside, without affecting the current cachePool
* result.
*/
protected[kubernetes] def touchSessionAllJob(
@Nonnull clusterId: String,
@Nonnull namespace: String,
@Nonnull appId: Long,
@Nonnull groupId: String): Array[(TrackId, JobStatusCV)] = {
lazy val defaultResult = Array.empty[(TrackId, JobStatusCV)]
val pollEmitTime = System.currentTimeMillis
val jobDetails = listJobsDetails(ClusterKey(SESSION, namespace, clusterId))
.getOrElse(return defaultResult)
.jobs
if (jobDetails.isEmpty) {
defaultResult
} else {
jobDetails.map {
d =>
TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) -> d.toJobStatusCV(
pollEmitTime,
System.currentTimeMillis)
}
}
}
/**
* Get flink status information from kubernetes-native-application cluster. When the
* flink-cluster-client request fails, will infer the job statue from k8s events.
*
* This method can be called directly from outside, without affecting the current cachePool
* result.
*/
def touchApplicationJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
implicit val pollEmitTime: Long = System.currentTimeMillis
val clusterId = trackId.clusterId
val namespace = trackId.namespace
val jobDetails = listJobsDetails(ClusterKey(APPLICATION, namespace, clusterId))
if (jobDetails.isEmpty || jobDetails.get.jobs.isEmpty) {
inferApplicationFlinkJobStateFromK8sEvent(trackId)
} else {
Some(jobDetails.get.jobs.head.toJobStatusCV(pollEmitTime, System.currentTimeMillis))
}
}
/** list flink jobs details */
private def listJobsDetails(clusterKey: ClusterKey): Option[JobDetails] = {
// get flink rest api
Try {
val clusterRestUrl =
watchController.getClusterRestUrl(clusterKey).filter(_.nonEmpty).getOrElse(return None)
// list flink jobs from rest api
callJobsOverviewsApi(clusterRestUrl)
}.getOrElse {
logger.warn(
"Failed to visit remote flink jobs on kubernetes-native-mode cluster, and the retry access logic is performed.")
val clusterRestUrl = watchController.refreshClusterRestUrl(clusterKey).getOrElse(return None)
Try(callJobsOverviewsApi(clusterRestUrl)) match {
case Success(s) =>
logger.info("The retry is successful.")
s
case Failure(e) =>
logger.warn(s"The retry fetch failed, final status failed, errorStack=${e.getMessage}.")
None
}
}
}
/** list flink jobs details from rest api */
private def callJobsOverviewsApi(restUrl: String): Option[JobDetails] = {
val jobDetails = JobDetails.as(
Request
.get(s"$restUrl/jobs/overview")
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
.execute
.returnContent()
.asString(StandardCharsets.UTF_8))
jobDetails
}
/**
* Infer the current flink state from the last relevant k8s events. This method is only used for
* application-mode job inference in case of a failed JM rest request.
*/
private def inferApplicationFlinkJobStateFromK8sEvent(@Nonnull trackId: TrackId)(implicit
pollEmitTime: Long): Option[JobStatusCV] = {
// infer from k8s deployment and event
val latest: JobStatusCV = watchController.jobStatuses.get(trackId)
logger.info(
s"Query the local cache result:${watchController.canceling.has(trackId).toString},trackId ${trackId.toString}.")
val jobState = {
if (watchController.canceling.has(trackId)) FlinkJobState.CANCELED
else {
// whether deployment exists on kubernetes cluster
val isDeployExists =
KubernetesRetriever.isDeploymentExists(trackId.clusterId, trackId.namespace)
val deployStateOfTheError = KubernetesDeploymentHelper.getDeploymentStatusChanges(
trackId.namespace,
trackId.clusterId)
val isConnection = KubernetesDeploymentHelper.isTheK8sConnectionNormal()
if (isDeployExists) {
if (!deployStateOfTheError) {
logger.info("Task Enter the initialization process.")
FlinkJobState.K8S_INITIALIZING
} else if (isConnection) {
logger.info("Enter the task failure deletion process.")
KubernetesDeploymentHelper.watchPodTerminatedLog(
trackId.namespace,
trackId.clusterId,
trackId.jobId)
KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, trackId.clusterId)
FlinkJobState.FAILED
} else {
inferSilentOrLostFromPreCache(latest)
}
} else if (isConnection) {
logger.info("The deployment is deleted and enters the task failure process.")
FlinkJobState.of(FlinkHistoryArchives.getJobStateFromArchiveFile(trackId.jobId))
} else {
inferSilentOrLostFromPreCache(latest)
}
}
}
val jobStatusCV = JobStatusCV(
jobState = jobState,
jobId = trackId.jobId,
pollEmitTime = pollEmitTime,
pollAckTime = System.currentTimeMillis)
if (
jobState == FlinkJobState.SILENT && latest != null && latest.jobState == FlinkJobState.SILENT
) {
Some(jobStatusCV.copy(pollEmitTime = latest.pollEmitTime, pollAckTime = latest.pollAckTime))
} else {
Some(jobStatusCV)
}
}
private[this] def inferSilentOrLostFromPreCache(preCache: JobStatusCV) = preCache match {
case preCache if preCache == null => FlinkJobState.SILENT
case preCache
if preCache.jobState == FlinkJobState.SILENT &&
System
.currentTimeMillis() - preCache.pollAckTime >= conf.silentStateJobKeepTrackingSec * 1000 =>
FlinkJobState.LOST
case _ => FlinkJobState.SILENT
}
}
object FlinkJobStatusWatcher {
private val effectEndStates: Seq[FlinkJobState.Value] =
FlinkJobState.endingStates.filter(_ != FlinkJobState.LOST)
/**
* infer flink job state before persistence. so drama, so sad.
*
* @param current
* current flink job state
* @param previous
* previous flink job state from persistent storage
*/
def inferFlinkJobStateFromPersist(
current: FlinkJobState.Value,
previous: FlinkJobState.Value): FlinkJobState.Value = {
current match {
case FlinkJobState.LOST =>
if (effectEndStates.contains(current)) previous else FlinkJobState.TERMINATED
case FlinkJobState.POS_TERMINATED | FlinkJobState.TERMINATED =>
previous match {
case FlinkJobState.CANCELLING => FlinkJobState.CANCELED
case FlinkJobState.FAILING => FlinkJobState.FAILED
case _ =>
if (current == FlinkJobState.POS_TERMINATED) FlinkJobState.FINISHED
else FlinkJobState.TERMINATED
}
case _ => current
}
}
}
private[kubernetes] case class JobDetails(jobs: Array[JobDetail] = Array())
private[kubernetes] case class JobDetail(
jid: String,
name: String,
state: String,
startTime: Long,
endTime: Long,
duration: Long,
lastModification: Long,
tasks: JobTask) {
def toJobStatusCV(pollEmitTime: Long, pollAckTime: Long): JobStatusCV = {
JobStatusCV(
jobState = FlinkJobState.of(state),
jobId = jid,
jobName = name,
jobStartTime = startTime,
jobEndTime = endTime,
duration = duration,
taskTotal = tasks.total,
pollEmitTime = pollEmitTime,
pollAckTime = pollAckTime
)
}
}
private[kubernetes] case class JobTask(
total: Int,
created: Int,
scheduled: Int,
deploying: Int,
running: Int,
finished: Int,
canceling: Int,
canceled: Int,
failed: Int,
reconciling: Int,
initializing: Int)
private[kubernetes] object JobDetails {
@transient
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
def as(json: String): Option[JobDetails] = {
Try(parse(json)) match {
case Success(ok) =>
ok \ "jobs" match {
case JNothing | JNull => None
case JArray(arr) =>
val details = arr
.map(
x => {
val task = x \ "tasks"
JobDetail(
(x \ "jid").extractOpt[String].orNull,
(x \ "name").extractOpt[String].orNull,
(x \ "state").extractOpt[String].orNull,
(x \ "start-time").extractOpt[Long].getOrElse(0),
(x \ "end-time").extractOpt[Long].getOrElse(0),
(x \ "duration").extractOpt[Long].getOrElse(0),
(x \ "last-modification").extractOpt[Long].getOrElse(0),
JobTask(
(task \ "total").extractOpt[Int].getOrElse(0),
(task \ "created").extractOpt[Int].getOrElse(0),
(task \ "scheduled").extractOpt[Int].getOrElse(0),
(task \ "deploying").extractOpt[Int].getOrElse(0),
(task \ "running").extractOpt[Int].getOrElse(0),
(task \ "finished").extractOpt[Int].getOrElse(0),
(task \ "canceling").extractOpt[Int].getOrElse(0),
(task \ "canceled").extractOpt[Int].getOrElse(0),
(task \ "failed").extractOpt[Int].getOrElse(0),
(task \ "reconciling").extractOpt[Int].getOrElse(0),
(task \ "initializing").extractOpt[Int].getOrElse(0)
)
)
})
.toArray
Some(JobDetails(details))
case _ => None
}
case Failure(_) => None
}
}
}
private[kubernetes] object FlinkHistoryArchives {
@transient
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
private[this] val FAILED_STATE = "FAILED"
def getJobStateFromArchiveFile(jobId: String): String = Try {
require(jobId != null, "[StreamPark] getJobStateFromArchiveFile: JobId cannot be null.")
val archivePath = new Path(Workspace.ARCHIVES_FILE_PATH, jobId)
FsJobArchivist.getArchivedJsons(archivePath) match {
case r if r.isEmpty => FAILED_STATE
case r =>
r.foreach {
a =>
if (a.getPath == s"/jobs/$jobId/exceptions") {
Try(parse(a.getJson)) match {
case Success(ok) =>
val log = (ok \ "root-exception").extractOpt[String].orNull
if (log != null) {
val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
val file = new File(path)
Files.asCharSink(file, Charsets.UTF_8).write(log)
}
case _ =>
}
} else if (a.getPath == "/jobs/overview") {
Try(parse(a.getJson)) match {
case Success(ok) =>
ok \ "jobs" match {
case JNothing | JNull =>
case JArray(arr) =>
arr.foreach(
x => {
val jid = (x \ "jid").extractOpt[String].orNull
if (jid == jobId) {
return (x \ "state").extractOpt[String].orNull
}
})
case _ =>
}
case Failure(_) =>
}
}
}
FAILED_STATE
}
}.getOrElse(FAILED_STATE)
}