| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.status |
| |
| import java.util.Date |
| import java.util.concurrent.ConcurrentHashMap |
| import java.util.function.Function |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable.HashMap |
| |
| import org.apache.spark._ |
| import org.apache.spark.executor.TaskMetrics |
| import org.apache.spark.internal.Logging |
| import org.apache.spark.scheduler._ |
| import org.apache.spark.status.api.v1 |
| import org.apache.spark.storage._ |
| import org.apache.spark.ui.SparkUI |
| import org.apache.spark.ui.scope._ |
| |
| /** |
| * A Spark listener that writes application information to a data store. The types written to the |
| * store are defined in the `storeTypes.scala` file and are based on the public REST API. |
| * |
| * @param lastUpdateTime When replaying logs, the log's last update time, so that the duration of |
| * unfinished tasks can be more accurately calculated (see SPARK-21922). |
| */ |
| private[spark] class AppStatusListener( |
| kvstore: ElementTrackingStore, |
| conf: SparkConf, |
| live: Boolean, |
| lastUpdateTime: Option[Long] = None) extends SparkListener with Logging { |
| |
| import config._ |
| |
| private var sparkVersion = SPARK_VERSION |
| private var appInfo: v1.ApplicationInfo = null |
| private var appSummary = new AppSummary(0, 0) |
| private var coresPerTask: Int = 1 |
| |
| // How often to update live entities. -1 means "never update" when replaying applications, |
| // meaning only the last write will happen. For live applications, this avoids a few |
| // operations that we can live without when rapidly processing incoming task events. |
| private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L |
| |
| /** |
| * Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when incoming |
| * task events are not fired frequently. |
| */ |
| private val liveUpdateMinFlushPeriod = conf.get(LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD) |
| |
| private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE) |
| private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES) |
| |
| // Keep track of live entities, so that task metrics can be efficiently updated (without |
| // causing too many writes to the underlying store, and other expensive operations). |
| private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]() |
| private val liveJobs = new HashMap[Int, LiveJob]() |
| private val liveExecutors = new HashMap[String, LiveExecutor]() |
| private val liveTasks = new HashMap[Long, LiveTask]() |
| private val liveRDDs = new HashMap[Int, LiveRDD]() |
| private val pools = new HashMap[String, SchedulerPool]() |
| // Keep the active executor count as a separate variable to avoid having to do synchronization |
| // around liveExecutors. |
| @volatile private var activeExecutorCount = 0 |
| |
| /** The last time when flushing `LiveEntity`s. This is to avoid flushing too frequently. */ |
| private var lastFlushTimeNs = System.nanoTime() |
| |
| kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS)) |
| { count => cleanupExecutors(count) } |
| |
| kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count => |
| cleanupJobs(count) |
| } |
| |
| kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count => |
| cleanupStages(count) |
| } |
| |
| kvstore.onFlush { |
| if (!live) { |
| val now = System.nanoTime() |
| flush(update(_, now)) |
| } |
| } |
| |
| override def onOtherEvent(event: SparkListenerEvent): Unit = event match { |
| case SparkListenerLogStart(version) => sparkVersion = version |
| case _ => |
| } |
| |
| override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { |
| assert(event.appId.isDefined, "Application without IDs are not supported.") |
| |
| val attempt = v1.ApplicationAttemptInfo( |
| event.appAttemptId, |
| new Date(event.time), |
| new Date(-1), |
| new Date(event.time), |
| -1L, |
| event.sparkUser, |
| false, |
| sparkVersion) |
| |
| appInfo = v1.ApplicationInfo( |
| event.appId.get, |
| event.appName, |
| None, |
| None, |
| None, |
| None, |
| Seq(attempt)) |
| |
| kvstore.write(new ApplicationInfoWrapper(appInfo)) |
| kvstore.write(appSummary) |
| |
| // Update the driver block manager with logs from this event. The SparkContext initialization |
| // code registers the driver before this event is sent. |
| event.driverLogs.foreach { logs => |
| val driver = liveExecutors.get(SparkContext.DRIVER_IDENTIFIER) |
| .orElse(liveExecutors.get(SparkContext.LEGACY_DRIVER_IDENTIFIER)) |
| driver.foreach { d => |
| d.executorLogs = logs.toMap |
| update(d, System.nanoTime()) |
| } |
| } |
| } |
| |
| override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { |
| val details = event.environmentDetails |
| |
| val jvmInfo = Map(details("JVM Information"): _*) |
| val runtime = new v1.RuntimeInfo( |
| jvmInfo.get("Java Version").orNull, |
| jvmInfo.get("Java Home").orNull, |
| jvmInfo.get("Scala Version").orNull) |
| |
| val envInfo = new v1.ApplicationEnvironmentInfo( |
| runtime, |
| details.getOrElse("Spark Properties", Nil), |
| details.getOrElse("System Properties", Nil), |
| details.getOrElse("Classpath Entries", Nil)) |
| |
| coresPerTask = envInfo.sparkProperties.toMap.get("spark.task.cpus").map(_.toInt) |
| .getOrElse(coresPerTask) |
| |
| kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo)) |
| } |
| |
| override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { |
| val old = appInfo.attempts.head |
| val attempt = v1.ApplicationAttemptInfo( |
| old.attemptId, |
| old.startTime, |
| new Date(event.time), |
| new Date(event.time), |
| event.time - old.startTime.getTime(), |
| old.sparkUser, |
| true, |
| old.appSparkVersion) |
| |
| appInfo = v1.ApplicationInfo( |
| appInfo.id, |
| appInfo.name, |
| None, |
| None, |
| None, |
| None, |
| Seq(attempt)) |
| kvstore.write(new ApplicationInfoWrapper(appInfo)) |
| } |
| |
| override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { |
| // This needs to be an update in case an executor re-registers after the driver has |
| // marked it as "dead". |
| val exec = getOrCreateExecutor(event.executorId, event.time) |
| exec.host = event.executorInfo.executorHost |
| exec.isActive = true |
| exec.totalCores = event.executorInfo.totalCores |
| exec.maxTasks = event.executorInfo.totalCores / coresPerTask |
| exec.executorLogs = event.executorInfo.logUrlMap |
| liveUpdate(exec, System.nanoTime()) |
| } |
| |
| override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { |
| liveExecutors.remove(event.executorId).foreach { exec => |
| val now = System.nanoTime() |
| activeExecutorCount = math.max(0, activeExecutorCount - 1) |
| exec.isActive = false |
| exec.removeTime = new Date(event.time) |
| exec.removeReason = event.reason |
| update(exec, now, last = true) |
| |
| // Remove all RDD distributions that reference the removed executor, in case there wasn't |
| // a corresponding event. |
| liveRDDs.values.foreach { rdd => |
| if (rdd.removeDistribution(exec)) { |
| update(rdd, now) |
| } |
| } |
| } |
| } |
| |
| override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = { |
| updateBlackListStatus(event.executorId, true) |
| } |
| |
| override def onExecutorBlacklistedForStage( |
| event: SparkListenerExecutorBlacklistedForStage): Unit = { |
| val now = System.nanoTime() |
| |
| Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage => |
| setStageBlackListStatus(stage, now, event.executorId) |
| } |
| liveExecutors.get(event.executorId).foreach { exec => |
| addBlackListedStageTo(exec, event.stageId, now) |
| } |
| } |
| |
| override def onNodeBlacklistedForStage(event: SparkListenerNodeBlacklistedForStage): Unit = { |
| val now = System.nanoTime() |
| |
| // Implicitly blacklist every available executor for the stage associated with this node |
| Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage => |
| val executorIds = liveExecutors.values.filter(_.host == event.hostId).map(_.executorId).toSeq |
| setStageBlackListStatus(stage, now, executorIds: _*) |
| } |
| liveExecutors.values.filter(_.hostname == event.hostId).foreach { exec => |
| addBlackListedStageTo(exec, event.stageId, now) |
| } |
| } |
| |
| private def addBlackListedStageTo(exec: LiveExecutor, stageId: Int, now: Long): Unit = { |
| exec.blacklistedInStages += stageId |
| liveUpdate(exec, now) |
| } |
| |
| private def setStageBlackListStatus(stage: LiveStage, now: Long, executorIds: String*): Unit = { |
| executorIds.foreach { executorId => |
| val executorStageSummary = stage.executorSummary(executorId) |
| executorStageSummary.isBlacklisted = true |
| maybeUpdate(executorStageSummary, now) |
| } |
| stage.blackListedExecutors ++= executorIds |
| maybeUpdate(stage, now) |
| } |
| |
| override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = { |
| updateBlackListStatus(event.executorId, false) |
| } |
| |
| override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = { |
| updateNodeBlackList(event.hostId, true) |
| } |
| |
| override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = { |
| updateNodeBlackList(event.hostId, false) |
| } |
| |
| private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { |
| liveExecutors.get(execId).foreach { exec => |
| exec.isBlacklisted = blacklisted |
| liveUpdate(exec, System.nanoTime()) |
| } |
| } |
| |
| private def updateNodeBlackList(host: String, blacklisted: Boolean): Unit = { |
| val now = System.nanoTime() |
| |
| // Implicitly (un)blacklist every executor associated with the node. |
| liveExecutors.values.foreach { exec => |
| if (exec.hostname == host) { |
| exec.isBlacklisted = blacklisted |
| liveUpdate(exec, now) |
| } |
| } |
| } |
| |
| override def onJobStart(event: SparkListenerJobStart): Unit = { |
| val now = System.nanoTime() |
| |
| // Compute (a potential over-estimate of) the number of tasks that will be run by this job. |
| // This may be an over-estimate because the job start event references all of the result |
| // stages' transitive stage dependencies, but some of these stages might be skipped if their |
| // output is available from earlier runs. |
| // See https://github.com/apache/spark/pull/3009 for a more extensive discussion. |
| val numTasks = { |
| val missingStages = event.stageInfos.filter(_.completionTime.isEmpty) |
| missingStages.map(_.numTasks).sum |
| } |
| |
| val lastStageInfo = event.stageInfos.sortBy(_.stageId).lastOption |
| val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") |
| val description = Option(event.properties) |
| .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) } |
| val jobGroup = Option(event.properties) |
| .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) } |
| |
| val job = new LiveJob( |
| event.jobId, |
| lastStageName, |
| description, |
| if (event.time > 0) Some(new Date(event.time)) else None, |
| event.stageIds, |
| jobGroup, |
| numTasks) |
| liveJobs.put(event.jobId, job) |
| liveUpdate(job, now) |
| |
| event.stageInfos.foreach { stageInfo => |
| // A new job submission may re-use an existing stage, so this code needs to do an update |
| // instead of just a write. |
| val stage = getOrCreateStage(stageInfo) |
| stage.jobs :+= job |
| stage.jobIds += event.jobId |
| liveUpdate(stage, now) |
| } |
| |
| // Create the graph data for all the job's stages. |
| event.stageInfos.foreach { stage => |
| val graph = RDDOperationGraph.makeOperationGraph(stage, maxGraphRootNodes) |
| val uigraph = new RDDOperationGraphWrapper( |
| stage.stageId, |
| graph.edges, |
| graph.outgoingEdges, |
| graph.incomingEdges, |
| newRDDOperationCluster(graph.rootCluster)) |
| kvstore.write(uigraph) |
| } |
| } |
| |
| private def newRDDOperationCluster(cluster: RDDOperationCluster): RDDOperationClusterWrapper = { |
| new RDDOperationClusterWrapper( |
| cluster.id, |
| cluster.name, |
| cluster.childNodes, |
| cluster.childClusters.map(newRDDOperationCluster)) |
| } |
| |
| override def onJobEnd(event: SparkListenerJobEnd): Unit = { |
| liveJobs.remove(event.jobId).foreach { job => |
| val now = System.nanoTime() |
| |
| // Check if there are any pending stages that match this job; mark those as skipped. |
| val it = liveStages.entrySet.iterator() |
| while (it.hasNext()) { |
| val e = it.next() |
| if (job.stageIds.contains(e.getKey()._1)) { |
| val stage = e.getValue() |
| if (v1.StageStatus.PENDING.equals(stage.status)) { |
| stage.status = v1.StageStatus.SKIPPED |
| job.skippedStages += stage.info.stageId |
| job.skippedTasks += stage.info.numTasks |
| job.activeStages -= 1 |
| |
| pools.get(stage.schedulingPool).foreach { pool => |
| pool.stageIds = pool.stageIds - stage.info.stageId |
| update(pool, now) |
| } |
| |
| it.remove() |
| update(stage, now, last = true) |
| } |
| } |
| } |
| |
| job.status = event.jobResult match { |
| case JobSucceeded => JobExecutionStatus.SUCCEEDED |
| case JobFailed(_) => JobExecutionStatus.FAILED |
| } |
| |
| job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None |
| update(job, now, last = true) |
| if (job.status == JobExecutionStatus.SUCCEEDED) { |
| appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages) |
| kvstore.write(appSummary) |
| } |
| } |
| } |
| |
| override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { |
| val now = System.nanoTime() |
| val stage = getOrCreateStage(event.stageInfo) |
| stage.status = v1.StageStatus.ACTIVE |
| stage.schedulingPool = Option(event.properties).flatMap { p => |
| Option(p.getProperty("spark.scheduler.pool")) |
| }.getOrElse(SparkUI.DEFAULT_POOL_NAME) |
| |
| // Look at all active jobs to find the ones that mention this stage. |
| stage.jobs = liveJobs.values |
| .filter(_.stageIds.contains(event.stageInfo.stageId)) |
| .toSeq |
| stage.jobIds = stage.jobs.map(_.jobId).toSet |
| |
| stage.description = Option(event.properties).flatMap { p => |
| Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) |
| } |
| |
| stage.jobs.foreach { job => |
| job.completedStages = job.completedStages - event.stageInfo.stageId |
| job.activeStages += 1 |
| liveUpdate(job, now) |
| } |
| |
| val pool = pools.getOrElseUpdate(stage.schedulingPool, new SchedulerPool(stage.schedulingPool)) |
| pool.stageIds = pool.stageIds + event.stageInfo.stageId |
| update(pool, now) |
| |
| event.stageInfo.rddInfos.foreach { info => |
| if (info.storageLevel.isValid) { |
| liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info)), now) |
| } |
| } |
| |
| liveUpdate(stage, now) |
| } |
| |
| override def onTaskStart(event: SparkListenerTaskStart): Unit = { |
| val now = System.nanoTime() |
| val task = new LiveTask(event.taskInfo, event.stageId, event.stageAttemptId, lastUpdateTime) |
| liveTasks.put(event.taskInfo.taskId, task) |
| liveUpdate(task, now) |
| |
| Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage => |
| stage.activeTasks += 1 |
| stage.firstLaunchTime = math.min(stage.firstLaunchTime, event.taskInfo.launchTime) |
| |
| val locality = event.taskInfo.taskLocality.toString() |
| val count = stage.localitySummary.getOrElse(locality, 0L) + 1L |
| stage.localitySummary = stage.localitySummary ++ Map(locality -> count) |
| stage.activeTasksPerExecutor(event.taskInfo.executorId) += 1 |
| maybeUpdate(stage, now) |
| |
| stage.jobs.foreach { job => |
| job.activeTasks += 1 |
| maybeUpdate(job, now) |
| } |
| |
| if (stage.savedTasks.incrementAndGet() > maxTasksPerStage && !stage.cleaning) { |
| stage.cleaning = true |
| kvstore.doAsync { |
| cleanupTasks(stage) |
| } |
| } |
| } |
| |
| liveExecutors.get(event.taskInfo.executorId).foreach { exec => |
| exec.activeTasks += 1 |
| exec.totalTasks += 1 |
| maybeUpdate(exec, now) |
| } |
| } |
| |
| override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = { |
| // Call update on the task so that the "getting result" time is written to the store; the |
| // value is part of the mutable TaskInfo state that the live entity already references. |
| liveTasks.get(event.taskInfo.taskId).foreach { task => |
| maybeUpdate(task, System.nanoTime()) |
| } |
| } |
| |
| override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { |
| // TODO: can this really happen? |
| if (event.taskInfo == null) { |
| return |
| } |
| |
| val now = System.nanoTime() |
| |
| val metricsDelta = liveTasks.remove(event.taskInfo.taskId).map { task => |
| task.info = event.taskInfo |
| |
| val errorMessage = event.reason match { |
| case Success => |
| None |
| case k: TaskKilled => |
| Some(k.reason) |
| case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates |
| Some(e.toErrorString) |
| case e: TaskFailedReason => // All other failure cases |
| Some(e.toErrorString) |
| case other => |
| logInfo(s"Unhandled task end reason: $other") |
| None |
| } |
| task.errorMessage = errorMessage |
| val delta = task.updateMetrics(event.taskMetrics) |
| update(task, now, last = true) |
| delta |
| }.orNull |
| |
| val (completedDelta, failedDelta, killedDelta) = event.reason match { |
| case Success => |
| (1, 0, 0) |
| case _: TaskKilled => |
| (0, 0, 1) |
| case _: TaskCommitDenied => |
| (0, 0, 1) |
| case _ => |
| (0, 1, 0) |
| } |
| |
| Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage => |
| if (metricsDelta != null) { |
| stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, metricsDelta) |
| } |
| stage.activeTasks -= 1 |
| stage.completedTasks += completedDelta |
| if (completedDelta > 0) { |
| stage.completedIndices.add(event.taskInfo.index) |
| } |
| stage.failedTasks += failedDelta |
| stage.killedTasks += killedDelta |
| if (killedDelta > 0) { |
| stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary) |
| } |
| stage.activeTasksPerExecutor(event.taskInfo.executorId) -= 1 |
| // [SPARK-24415] Wait for all tasks to finish before removing stage from live list |
| val removeStage = |
| stage.activeTasks == 0 && |
| (v1.StageStatus.COMPLETE.equals(stage.status) || |
| v1.StageStatus.FAILED.equals(stage.status)) |
| if (removeStage) { |
| update(stage, now, last = true) |
| } else { |
| maybeUpdate(stage, now) |
| } |
| |
| // Store both stage ID and task index in a single long variable for tracking at job level. |
| val taskIndex = (event.stageId.toLong << Integer.SIZE) | event.taskInfo.index |
| stage.jobs.foreach { job => |
| job.activeTasks -= 1 |
| job.completedTasks += completedDelta |
| if (completedDelta > 0) { |
| job.completedIndices.add(taskIndex) |
| } |
| job.failedTasks += failedDelta |
| job.killedTasks += killedDelta |
| if (killedDelta > 0) { |
| job.killedSummary = killedTasksSummary(event.reason, job.killedSummary) |
| } |
| if (removeStage) { |
| update(job, now) |
| } else { |
| maybeUpdate(job, now) |
| } |
| } |
| |
| val esummary = stage.executorSummary(event.taskInfo.executorId) |
| esummary.taskTime += event.taskInfo.duration |
| esummary.succeededTasks += completedDelta |
| esummary.failedTasks += failedDelta |
| esummary.killedTasks += killedDelta |
| if (metricsDelta != null) { |
| esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta) |
| } |
| |
| val isLastTask = stage.activeTasksPerExecutor(event.taskInfo.executorId) == 0 |
| |
| // If the last task of the executor finished, then update the esummary |
| // for both live and history events. |
| if (isLastTask) { |
| update(esummary, now) |
| } else { |
| maybeUpdate(esummary, now) |
| } |
| |
| if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) { |
| stage.cleaning = true |
| kvstore.doAsync { |
| cleanupTasks(stage) |
| } |
| } |
| if (removeStage) { |
| liveStages.remove((event.stageId, event.stageAttemptId)) |
| } |
| } |
| |
| liveExecutors.get(event.taskInfo.executorId).foreach { exec => |
| exec.activeTasks -= 1 |
| exec.completedTasks += completedDelta |
| exec.failedTasks += failedDelta |
| exec.totalDuration += event.taskInfo.duration |
| |
| // Note: For resubmitted tasks, we continue to use the metrics that belong to the |
| // first attempt of this task. This may not be 100% accurate because the first attempt |
| // could have failed half-way through. The correct fix would be to keep track of the |
| // metrics added by each attempt, but this is much more complicated. |
| if (event.reason != Resubmitted) { |
| if (event.taskMetrics != null) { |
| val readMetrics = event.taskMetrics.shuffleReadMetrics |
| exec.totalGcTime += event.taskMetrics.jvmGCTime |
| exec.totalInputBytes += event.taskMetrics.inputMetrics.bytesRead |
| exec.totalShuffleRead += readMetrics.localBytesRead + readMetrics.remoteBytesRead |
| exec.totalShuffleWrite += event.taskMetrics.shuffleWriteMetrics.bytesWritten |
| } |
| } |
| |
| // Force an update on both live and history applications when the number of active tasks |
| // reaches 0. This is checked in some tests (e.g. SQLTestUtilsBase) so it needs to be |
| // reliably up to date. |
| if (exec.activeTasks == 0) { |
| update(exec, now) |
| } else { |
| maybeUpdate(exec, now) |
| } |
| } |
| } |
| |
| override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { |
| val maybeStage = |
| Option(liveStages.get((event.stageInfo.stageId, event.stageInfo.attemptNumber))) |
| maybeStage.foreach { stage => |
| val now = System.nanoTime() |
| stage.info = event.stageInfo |
| |
| // We have to update the stage status AFTER we create all the executorSummaries |
| // because stage deletion deletes whatever summaries it finds when the status is completed. |
| stage.executorSummaries.values.foreach(update(_, now)) |
| |
| // Because of SPARK-20205, old event logs may contain valid stages without a submission time |
| // in their start event. In those cases, we can only detect whether a stage was skipped by |
| // waiting until the completion event, at which point the field would have been set. |
| stage.status = event.stageInfo.failureReason match { |
| case Some(_) => v1.StageStatus.FAILED |
| case _ if event.stageInfo.submissionTime.isDefined => v1.StageStatus.COMPLETE |
| case _ => v1.StageStatus.SKIPPED |
| } |
| |
| stage.jobs.foreach { job => |
| stage.status match { |
| case v1.StageStatus.COMPLETE => |
| job.completedStages += event.stageInfo.stageId |
| case v1.StageStatus.SKIPPED => |
| job.skippedStages += event.stageInfo.stageId |
| job.skippedTasks += event.stageInfo.numTasks |
| case _ => |
| job.failedStages += 1 |
| } |
| job.activeStages -= 1 |
| liveUpdate(job, now) |
| } |
| |
| pools.get(stage.schedulingPool).foreach { pool => |
| pool.stageIds = pool.stageIds - event.stageInfo.stageId |
| update(pool, now) |
| } |
| |
| val executorIdsForStage = stage.blackListedExecutors |
| executorIdsForStage.foreach { executorId => |
| liveExecutors.get(executorId).foreach { exec => |
| removeBlackListedStageFrom(exec, event.stageInfo.stageId, now) |
| } |
| } |
| |
| // Remove stage only if there are no active tasks remaining |
| val removeStage = stage.activeTasks == 0 |
| update(stage, now, last = removeStage) |
| if (removeStage) { |
| liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber)) |
| } |
| if (stage.status == v1.StageStatus.COMPLETE) { |
| appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1) |
| kvstore.write(appSummary) |
| } |
| } |
| } |
| |
| private def removeBlackListedStageFrom(exec: LiveExecutor, stageId: Int, now: Long) = { |
| exec.blacklistedInStages -= stageId |
| liveUpdate(exec, now) |
| } |
| |
| override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = { |
| // This needs to set fields that are already set by onExecutorAdded because the driver is |
| // considered an "executor" in the UI, but does not have a SparkListenerExecutorAdded event. |
| val exec = getOrCreateExecutor(event.blockManagerId.executorId, event.time) |
| exec.hostPort = event.blockManagerId.hostPort |
| event.maxOnHeapMem.foreach { _ => |
| exec.totalOnHeap = event.maxOnHeapMem.get |
| exec.totalOffHeap = event.maxOffHeapMem.get |
| } |
| exec.isActive = true |
| exec.maxMemory = event.maxMem |
| liveUpdate(exec, System.nanoTime()) |
| } |
| |
| override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = { |
| // Nothing to do here. Covered by onExecutorRemoved. |
| } |
| |
| override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { |
| liveRDDs.remove(event.rddId).foreach { liveRDD => |
| val storageLevel = liveRDD.info.storageLevel |
| |
| // Use RDD partition info to update executor block info. |
| liveRDD.getPartitions().foreach { case (_, part) => |
| part.executors.foreach { executorId => |
| liveExecutors.get(executorId).foreach { exec => |
| exec.rddBlocks = exec.rddBlocks - 1 |
| } |
| } |
| } |
| |
| val now = System.nanoTime() |
| |
| // Use RDD distribution to update executor memory and disk usage info. |
| liveRDD.getDistributions().foreach { case (executorId, rddDist) => |
| liveExecutors.get(executorId).foreach { exec => |
| if (exec.hasMemoryInfo) { |
| if (storageLevel.useOffHeap) { |
| exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, -rddDist.offHeapUsed) |
| } else { |
| exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, -rddDist.onHeapUsed) |
| } |
| } |
| exec.memoryUsed = addDeltaToValue(exec.memoryUsed, -rddDist.memoryUsed) |
| exec.diskUsed = addDeltaToValue(exec.diskUsed, -rddDist.diskUsed) |
| maybeUpdate(exec, now) |
| } |
| } |
| } |
| |
| kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId) |
| } |
| |
| override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { |
| val now = System.nanoTime() |
| |
| event.accumUpdates.foreach { case (taskId, sid, sAttempt, accumUpdates) => |
| liveTasks.get(taskId).foreach { task => |
| val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates) |
| val delta = task.updateMetrics(metrics) |
| maybeUpdate(task, now) |
| |
| Option(liveStages.get((sid, sAttempt))).foreach { stage => |
| stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, delta) |
| maybeUpdate(stage, now) |
| |
| val esummary = stage.executorSummary(event.execId) |
| esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, delta) |
| maybeUpdate(esummary, now) |
| } |
| } |
| } |
| |
| // Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush |
| // here to ensure the staleness of Spark UI doesn't last more than |
| // `max(heartbeat interval, liveUpdateMinFlushPeriod)`. |
| if (now - lastFlushTimeNs > liveUpdateMinFlushPeriod) { |
| flush(maybeUpdate(_, now)) |
| // Re-get the current system time because `flush` may be slow and `now` is stale. |
| lastFlushTimeNs = System.nanoTime() |
| } |
| } |
| |
| override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { |
| event.blockUpdatedInfo.blockId match { |
| case block: RDDBlockId => updateRDDBlock(event, block) |
| case stream: StreamBlockId => updateStreamBlock(event, stream) |
| case broadcast: BroadcastBlockId => updateBroadcastBlock(event, broadcast) |
| case _ => |
| } |
| } |
| |
| /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */ |
| private def flush(entityFlushFunc: LiveEntity => Unit): Unit = { |
| liveStages.values.asScala.foreach { stage => |
| entityFlushFunc(stage) |
| stage.executorSummaries.values.foreach(entityFlushFunc) |
| } |
| liveJobs.values.foreach(entityFlushFunc) |
| liveExecutors.values.foreach(entityFlushFunc) |
| liveTasks.values.foreach(entityFlushFunc) |
| liveRDDs.values.foreach(entityFlushFunc) |
| pools.values.foreach(entityFlushFunc) |
| } |
| |
| /** |
| * Shortcut to get active stages quickly in a live application, for use by the console |
| * progress bar. |
| */ |
| def activeStages(): Seq[v1.StageData] = { |
| liveStages.values.asScala |
| .filter(_.info.submissionTime.isDefined) |
| .map(_.toApi()) |
| .toList |
| .sortBy(_.stageId) |
| } |
| |
| /** |
| * Apply a delta to a value, but ensure that it doesn't go negative. |
| */ |
| private def addDeltaToValue(old: Long, delta: Long): Long = math.max(0, old + delta) |
| |
| private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = { |
| val now = System.nanoTime() |
| val executorId = event.blockUpdatedInfo.blockManagerId.executorId |
| |
| // Whether values are being added to or removed from the existing accounting. |
| val storageLevel = event.blockUpdatedInfo.storageLevel |
| val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) |
| val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) |
| |
| val updatedStorageLevel = if (storageLevel.isValid) { |
| Some(storageLevel.description) |
| } else { |
| None |
| } |
| |
| // We need information about the executor to update some memory accounting values in the |
| // RDD info, so read that beforehand. |
| val maybeExec = liveExecutors.get(executorId) |
| var rddBlocksDelta = 0 |
| |
| // Update the executor stats first, since they are used to calculate the free memory |
| // on tracked RDD distributions. |
| maybeExec.foreach { exec => |
| updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta) |
| } |
| |
| // Update the block entry in the RDD info, keeping track of the deltas above so that we |
| // can update the executor information too. |
| liveRDDs.get(block.rddId).foreach { rdd => |
| if (updatedStorageLevel.isDefined) { |
| rdd.setStorageLevel(updatedStorageLevel.get) |
| } |
| |
| val partition = rdd.partition(block.name) |
| |
| val executors = if (updatedStorageLevel.isDefined) { |
| val current = partition.executors |
| if (current.contains(executorId)) { |
| current |
| } else { |
| rddBlocksDelta = 1 |
| current :+ executorId |
| } |
| } else { |
| rddBlocksDelta = -1 |
| partition.executors.filter(_ != executorId) |
| } |
| |
| // Only update the partition if it's still stored in some executor, otherwise get rid of it. |
| if (executors.nonEmpty) { |
| partition.update(executors, rdd.storageLevel, |
| addDeltaToValue(partition.memoryUsed, memoryDelta), |
| addDeltaToValue(partition.diskUsed, diskDelta)) |
| } else { |
| rdd.removePartition(block.name) |
| } |
| |
| maybeExec.foreach { exec => |
| if (exec.rddBlocks + rddBlocksDelta > 0) { |
| val dist = rdd.distribution(exec) |
| dist.memoryUsed = addDeltaToValue(dist.memoryUsed, memoryDelta) |
| dist.diskUsed = addDeltaToValue(dist.diskUsed, diskDelta) |
| |
| if (exec.hasMemoryInfo) { |
| if (storageLevel.useOffHeap) { |
| dist.offHeapUsed = addDeltaToValue(dist.offHeapUsed, memoryDelta) |
| } else { |
| dist.onHeapUsed = addDeltaToValue(dist.onHeapUsed, memoryDelta) |
| } |
| } |
| dist.lastUpdate = null |
| } else { |
| rdd.removeDistribution(exec) |
| } |
| |
| // Trigger an update on other RDDs so that the free memory information is updated. |
| liveRDDs.values.foreach { otherRdd => |
| if (otherRdd.info.id != block.rddId) { |
| otherRdd.distributionOpt(exec).foreach { dist => |
| dist.lastUpdate = null |
| update(otherRdd, now) |
| } |
| } |
| } |
| } |
| |
| rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed, memoryDelta) |
| rdd.diskUsed = addDeltaToValue(rdd.diskUsed, diskDelta) |
| update(rdd, now) |
| } |
| |
| // Finish updating the executor now that we know the delta in the number of blocks. |
| maybeExec.foreach { exec => |
| exec.rddBlocks += rddBlocksDelta |
| maybeUpdate(exec, now) |
| } |
| } |
| |
| private def getOrCreateExecutor(executorId: String, addTime: Long): LiveExecutor = { |
| liveExecutors.getOrElseUpdate(executorId, { |
| activeExecutorCount += 1 |
| new LiveExecutor(executorId, addTime) |
| }) |
| } |
| |
| private def updateStreamBlock(event: SparkListenerBlockUpdated, stream: StreamBlockId): Unit = { |
| val storageLevel = event.blockUpdatedInfo.storageLevel |
| if (storageLevel.isValid) { |
| val data = new StreamBlockData( |
| stream.name, |
| event.blockUpdatedInfo.blockManagerId.executorId, |
| event.blockUpdatedInfo.blockManagerId.hostPort, |
| storageLevel.description, |
| storageLevel.useMemory, |
| storageLevel.useDisk, |
| storageLevel.deserialized, |
| event.blockUpdatedInfo.memSize, |
| event.blockUpdatedInfo.diskSize) |
| kvstore.write(data) |
| } else { |
| kvstore.delete(classOf[StreamBlockData], |
| Array(stream.name, event.blockUpdatedInfo.blockManagerId.executorId)) |
| } |
| } |
| |
| private def updateBroadcastBlock( |
| event: SparkListenerBlockUpdated, |
| broadcast: BroadcastBlockId): Unit = { |
| val executorId = event.blockUpdatedInfo.blockManagerId.executorId |
| liveExecutors.get(executorId).foreach { exec => |
| val now = System.nanoTime() |
| val storageLevel = event.blockUpdatedInfo.storageLevel |
| |
| // Whether values are being added to or removed from the existing accounting. |
| val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) |
| val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) |
| |
| updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta) |
| maybeUpdate(exec, now) |
| } |
| } |
| |
| private def updateExecutorMemoryDiskInfo( |
| exec: LiveExecutor, |
| storageLevel: StorageLevel, |
| memoryDelta: Long, |
| diskDelta: Long): Unit = { |
| if (exec.hasMemoryInfo) { |
| if (storageLevel.useOffHeap) { |
| exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta) |
| } else { |
| exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta) |
| } |
| } |
| exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta) |
| exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta) |
| } |
| |
| private def getOrCreateStage(info: StageInfo): LiveStage = { |
| val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber), |
| new Function[(Int, Int), LiveStage]() { |
| override def apply(key: (Int, Int)): LiveStage = new LiveStage() |
| }) |
| stage.info = info |
| stage |
| } |
| |
| private def killedTasksSummary( |
| reason: TaskEndReason, |
| oldSummary: Map[String, Int]): Map[String, Int] = { |
| reason match { |
| case k: TaskKilled => |
| oldSummary.updated(k.reason, oldSummary.getOrElse(k.reason, 0) + 1) |
| case denied: TaskCommitDenied => |
| val reason = denied.toErrorString |
| oldSummary.updated(reason, oldSummary.getOrElse(reason, 0) + 1) |
| case _ => |
| oldSummary |
| } |
| } |
| |
| private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = { |
| entity.write(kvstore, now, checkTriggers = last) |
| } |
| |
| /** Update a live entity only if it hasn't been updated in the last configured period. */ |
| private def maybeUpdate(entity: LiveEntity, now: Long): Unit = { |
| if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) { |
| update(entity, now) |
| } |
| } |
| |
| /** Update an entity only if in a live app; avoids redundant writes when replaying logs. */ |
| private def liveUpdate(entity: LiveEntity, now: Long): Unit = { |
| if (live) { |
| update(entity, now) |
| } |
| } |
| |
| private def cleanupExecutors(count: Long): Unit = { |
| // Because the limit is on the number of *dead* executors, we need to calculate whether |
| // there are actually enough dead executors to be deleted. |
| val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS) |
| val dead = count - activeExecutorCount |
| |
| if (dead > threshold) { |
| val countToDelete = calculateNumberToRemove(dead, threshold) |
| val toDelete = kvstore.view(classOf[ExecutorSummaryWrapper]).index("active") |
| .max(countToDelete).first(false).last(false).asScala.toSeq |
| toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) } |
| } |
| } |
| |
| private def cleanupJobs(count: Long): Unit = { |
| val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS)) |
| if (countToDelete <= 0L) { |
| return |
| } |
| |
| val view = kvstore.view(classOf[JobDataWrapper]).index("completionTime").first(0L) |
| val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j => |
| j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN |
| } |
| toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) } |
| } |
| |
| private def cleanupStages(count: Long): Unit = { |
| val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES)) |
| if (countToDelete <= 0L) { |
| return |
| } |
| |
| // As the completion time of a skipped stage is always -1, we will remove skipped stages first. |
| // This is safe since the job itself contains enough information to render skipped stages in the |
| // UI. |
| val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime") |
| val stages = KVUtils.viewToSeq(view, countToDelete.toInt) { s => |
| s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING |
| } |
| |
| val stageIds = stages.map { s => |
| val key = Array(s.info.stageId, s.info.attemptId) |
| kvstore.delete(s.getClass(), key) |
| |
| // Check whether there are remaining attempts for the same stage. If there aren't, then |
| // also delete the RDD graph data. |
| val remainingAttempts = kvstore.view(classOf[StageDataWrapper]) |
| .index("stageId") |
| .first(s.info.stageId) |
| .last(s.info.stageId) |
| .closeableIterator() |
| |
| val hasMoreAttempts = try { |
| remainingAttempts.asScala.exists { other => |
| other.info.attemptId != s.info.attemptId |
| } |
| } finally { |
| remainingAttempts.close() |
| } |
| |
| if (!hasMoreAttempts) { |
| kvstore.delete(classOf[RDDOperationGraphWrapper], s.info.stageId) |
| } |
| |
| cleanupCachedQuantiles(key) |
| key |
| } |
| |
| // Delete summaries in one pass, as deleting them for each stage is slow |
| kvstore.removeAllByIndexValues(classOf[ExecutorStageSummaryWrapper], "stage", stageIds) |
| |
| // Delete tasks for all stages in one pass, as deleting them for each stage individually is slow |
| kvstore.removeAllByIndexValues(classOf[TaskDataWrapper], TaskIndexNames.STAGE, stageIds) |
| } |
| |
| private def cleanupTasks(stage: LiveStage): Unit = { |
| val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt |
| if (countToDelete > 0) { |
| val stageKey = Array(stage.info.stageId, stage.info.attemptNumber) |
| val view = kvstore.view(classOf[TaskDataWrapper]) |
| .index(TaskIndexNames.COMPLETION_TIME) |
| .parent(stageKey) |
| |
| // Try to delete finished tasks only. |
| val toDelete = KVUtils.viewToSeq(view, countToDelete) { t => |
| !live || t.status != TaskState.RUNNING.toString() |
| } |
| toDelete.foreach { t => kvstore.delete(t.getClass(), t.taskId) } |
| stage.savedTasks.addAndGet(-toDelete.size) |
| |
| // If there are more running tasks than the configured limit, delete running tasks. This |
| // should be extremely rare since the limit should generally far exceed the number of tasks |
| // that can run in parallel. |
| val remaining = countToDelete - toDelete.size |
| if (remaining > 0) { |
| val runningTasksToDelete = view.max(remaining).iterator().asScala.toList |
| runningTasksToDelete.foreach { t => kvstore.delete(t.getClass(), t.taskId) } |
| stage.savedTasks.addAndGet(-remaining) |
| } |
| |
| // On live applications, cleanup any cached quantiles for the stage. This makes sure that |
| // quantiles will be recalculated after tasks are replaced with newer ones. |
| // |
| // This is not needed in the SHS since caching only happens after the event logs are |
| // completely processed. |
| if (live) { |
| cleanupCachedQuantiles(stageKey) |
| } |
| } |
| stage.cleaning = false |
| } |
| |
| private def cleanupCachedQuantiles(stageKey: Array[Int]): Unit = { |
| val cachedQuantiles = kvstore.view(classOf[CachedQuantile]) |
| .index("stage") |
| .first(stageKey) |
| .last(stageKey) |
| .asScala |
| .toList |
| cachedQuantiles.foreach { q => |
| kvstore.delete(q.getClass(), q.id) |
| } |
| } |
| |
| /** |
| * Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done |
| * asynchronously, this method may return 0 in case enough items have been deleted already. |
| */ |
| private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): Long = { |
| if (dataSize > retainedSize) { |
| math.max(retainedSize / 10L, dataSize - retainedSize) |
| } else { |
| 0L |
| } |
| } |
| |
| } |