package org.apache.spark.status
import java.util.Date
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Function
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._
import org.apache.spark.status.api.v1
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.scope._
* A Spark listener that writes application information to a data store. The types written to the
* store are defined in the `storeTypes.scala` file and are based on the public REST API.
* @param lastUpdateTime When replaying logs, the log's last update time, so that the duration of
* unfinished tasks can be more accurately calculated (see SPARK-21922).
private[spark] class AppStatusListener(
kvstore: ElementTrackingStore,
conf: SparkConf,
live: Boolean,
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
import config._
private var sparkVersion = SPARK_VERSION
private var appInfo: v1.ApplicationInfo = null
private var appSummary = new AppSummary(0, 0)
private var coresPerTask: Int = 1
// How often to update live entities. -1 means "never update" when replaying applications,
// meaning only the last write will happen. For live applications, this avoids a few
// operations that we can live without when rapidly processing incoming task events.
private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
* Minimum time elapsed before stale UI data is flushed. This avoids UI staleness when incoming
* task events are not fired frequently.
private val liveUpdateMinFlushPeriod = conf.get(LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD)
private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE)
private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES)
// Keep track of live entities, so that task metrics can be efficiently updated (without
// causing too many writes to the underlying store, and other expensive operations).
private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
private val liveJobs = new HashMap[Int, LiveJob]()
private val liveExecutors = new HashMap[String, LiveExecutor]()
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
private val pools = new HashMap[String, SchedulerPool]()
// Keep the active executor count as a separate variable to avoid having to do synchronization
// around liveExecutors.
@volatile private var activeExecutorCount = 0
/** The last time when flushing `LiveEntity`s. This is to avoid flushing too frequently. */
private var lastFlushTimeNs = System.nanoTime()
kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS))
{ count => cleanupExecutors(count) }
kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count =>
kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count =>
kvstore.onFlush {
if (!live) {
val now = System.nanoTime()
flush(update(_, now))
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case SparkListenerLogStart(version) => sparkVersion = version
case _ =>
override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
assert(event.appId.isDefined, "Application without IDs are not supported.")
val attempt = v1.ApplicationAttemptInfo(
new Date(event.time),
new Date(-1),
new Date(event.time),
appInfo = v1.ApplicationInfo(
kvstore.write(new ApplicationInfoWrapper(appInfo))
// Update the driver block manager with logs from this event. The SparkContext initialization
// code registers the driver before this event is sent.
event.driverLogs.foreach { logs =>
val driver = liveExecutors.get(SparkContext.DRIVER_IDENTIFIER)
driver.foreach { d =>
d.executorLogs = logs.toMap
update(d, System.nanoTime())
override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
val details = event.environmentDetails
val jvmInfo = Map(details("JVM Information"): _*)
val runtime = new v1.RuntimeInfo(
jvmInfo.get("Java Version").orNull,
jvmInfo.get("Java Home").orNull,
jvmInfo.get("Scala Version").orNull)
val envInfo = new v1.ApplicationEnvironmentInfo(
details.getOrElse("Spark Properties", Nil),
details.getOrElse("System Properties", Nil),
details.getOrElse("Classpath Entries", Nil))
coresPerTask = envInfo.sparkProperties.toMap.get("spark.task.cpus").map(_.toInt)
kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo))
override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
val old = appInfo.attempts.head
val attempt = v1.ApplicationAttemptInfo(
new Date(event.time),
new Date(event.time),
event.time - old.startTime.getTime(),
appInfo = v1.ApplicationInfo(,,
kvstore.write(new ApplicationInfoWrapper(appInfo))
override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
// This needs to be an update in case an executor re-registers after the driver has
// marked it as "dead".
val exec = getOrCreateExecutor(event.executorId, event.time) = event.executorInfo.executorHost
exec.isActive = true
exec.totalCores = event.executorInfo.totalCores
exec.maxTasks = event.executorInfo.totalCores / coresPerTask
exec.executorLogs = event.executorInfo.logUrlMap
liveUpdate(exec, System.nanoTime())
override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
liveExecutors.remove(event.executorId).foreach { exec =>
val now = System.nanoTime()
activeExecutorCount = math.max(0, activeExecutorCount - 1)
exec.isActive = false
exec.removeTime = new Date(event.time)
exec.removeReason = event.reason
update(exec, now, last = true)
// Remove all RDD distributions that reference the removed executor, in case there wasn't
// a corresponding event.
liveRDDs.values.foreach { rdd =>
if (rdd.removeDistribution(exec)) {
update(rdd, now)
override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = {
updateBlackListStatus(event.executorId, true)
override def onExecutorBlacklistedForStage(
event: SparkListenerExecutorBlacklistedForStage): Unit = {
val now = System.nanoTime()
Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
setStageBlackListStatus(stage, now, event.executorId)
liveExecutors.get(event.executorId).foreach { exec =>
addBlackListedStageTo(exec, event.stageId, now)
override def onNodeBlacklistedForStage(event: SparkListenerNodeBlacklistedForStage): Unit = {
val now = System.nanoTime()
// Implicitly blacklist every available executor for the stage associated with this node
Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
val executorIds = liveExecutors.values.filter( == event.hostId).map(_.executorId).toSeq
setStageBlackListStatus(stage, now, executorIds: _*)
liveExecutors.values.filter(_.hostname == event.hostId).foreach { exec =>
addBlackListedStageTo(exec, event.stageId, now)
private def addBlackListedStageTo(exec: LiveExecutor, stageId: Int, now: Long): Unit = {
exec.blacklistedInStages += stageId
liveUpdate(exec, now)
private def setStageBlackListStatus(stage: LiveStage, now: Long, executorIds: String*): Unit = {
executorIds.foreach { executorId =>
val executorStageSummary = stage.executorSummary(executorId)
executorStageSummary.isBlacklisted = true
maybeUpdate(executorStageSummary, now)
stage.blackListedExecutors ++= executorIds
maybeUpdate(stage, now)
override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = {
updateBlackListStatus(event.executorId, false)
override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = {
updateNodeBlackList(event.hostId, true)
override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = {
updateNodeBlackList(event.hostId, false)
private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = {
liveExecutors.get(execId).foreach { exec =>
exec.isBlacklisted = blacklisted
liveUpdate(exec, System.nanoTime())
private def updateNodeBlackList(host: String, blacklisted: Boolean): Unit = {
val now = System.nanoTime()
// Implicitly (un)blacklist every executor associated with the node.
liveExecutors.values.foreach { exec =>
if (exec.hostname == host) {
exec.isBlacklisted = blacklisted
liveUpdate(exec, now)
override def onJobStart(event: SparkListenerJobStart): Unit = {
val now = System.nanoTime()
// Compute (a potential over-estimate of) the number of tasks that will be run by this job.
// This may be an over-estimate because the job start event references all of the result
// stages' transitive stage dependencies, but some of these stages might be skipped if their
// output is available from earlier runs.
// See for a more extensive discussion.
val numTasks = {
val missingStages = event.stageInfos.filter(_.completionTime.isEmpty)
val lastStageInfo = event.stageInfos.sortBy(_.stageId).lastOption
val lastStageName ="(Unknown Stage Name)")
val description = Option(
.flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) }
val jobGroup = Option(
.flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
val job = new LiveJob(
if (event.time > 0) Some(new Date(event.time)) else None,
liveJobs.put(event.jobId, job)
liveUpdate(job, now)
event.stageInfos.foreach { stageInfo =>
// A new job submission may re-use an existing stage, so this code needs to do an update
// instead of just a write.
val stage = getOrCreateStage(stageInfo) :+= job
stage.jobIds += event.jobId
liveUpdate(stage, now)
// Create the graph data for all the job's stages.
event.stageInfos.foreach { stage =>
val graph = RDDOperationGraph.makeOperationGraph(stage, maxGraphRootNodes)
val uigraph = new RDDOperationGraphWrapper(
private def newRDDOperationCluster(cluster: RDDOperationCluster): RDDOperationClusterWrapper = {
new RDDOperationClusterWrapper(,,
override def onJobEnd(event: SparkListenerJobEnd): Unit = {
liveJobs.remove(event.jobId).foreach { job =>
val now = System.nanoTime()
// Check if there are any pending stages that match this job; mark those as skipped.
val it = liveStages.entrySet.iterator()
while (it.hasNext()) {
val e =
if (job.stageIds.contains(e.getKey()._1)) {
val stage = e.getValue()
if (v1.StageStatus.PENDING.equals(stage.status)) {
stage.status = v1.StageStatus.SKIPPED
job.skippedStages +=
job.skippedTasks +=
job.activeStages -= 1
pools.get(stage.schedulingPool).foreach { pool =>
pool.stageIds = pool.stageIds -
update(pool, now)
update(stage, now, last = true)
job.status = event.jobResult match {
case JobSucceeded => JobExecutionStatus.SUCCEEDED
case JobFailed(_) => JobExecutionStatus.FAILED
job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None
update(job, now, last = true)
if (job.status == JobExecutionStatus.SUCCEEDED) {
appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages)
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
val now = System.nanoTime()
val stage = getOrCreateStage(event.stageInfo)
stage.status = v1.StageStatus.ACTIVE
stage.schedulingPool = Option( { p =>
// Look at all active jobs to find the ones that mention this stage. = liveJobs.values
stage.jobIds =
stage.description = Option( { p =>
} { job =>
job.completedStages = job.completedStages - event.stageInfo.stageId
job.activeStages += 1
liveUpdate(job, now)
val pool = pools.getOrElseUpdate(stage.schedulingPool, new SchedulerPool(stage.schedulingPool))
pool.stageIds = pool.stageIds + event.stageInfo.stageId
update(pool, now)
event.stageInfo.rddInfos.foreach { info =>
if (info.storageLevel.isValid) {
liveUpdate(liveRDDs.getOrElseUpdate(, new LiveRDD(info)), now)
liveUpdate(stage, now)
override def onTaskStart(event: SparkListenerTaskStart): Unit = {
val now = System.nanoTime()
val task = new LiveTask(event.taskInfo, event.stageId, event.stageAttemptId, lastUpdateTime)
liveTasks.put(event.taskInfo.taskId, task)
liveUpdate(task, now)
Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
stage.activeTasks += 1
stage.firstLaunchTime = math.min(stage.firstLaunchTime, event.taskInfo.launchTime)
val locality = event.taskInfo.taskLocality.toString()
val count = stage.localitySummary.getOrElse(locality, 0L) + 1L
stage.localitySummary = stage.localitySummary ++ Map(locality -> count)
stage.activeTasksPerExecutor(event.taskInfo.executorId) += 1
maybeUpdate(stage, now) { job =>
job.activeTasks += 1
maybeUpdate(job, now)
if (stage.savedTasks.incrementAndGet() > maxTasksPerStage && ! { = true
kvstore.doAsync {
liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
exec.activeTasks += 1
exec.totalTasks += 1
maybeUpdate(exec, now)
override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = {
// Call update on the task so that the "getting result" time is written to the store; the
// value is part of the mutable TaskInfo state that the live entity already references.
liveTasks.get(event.taskInfo.taskId).foreach { task =>
maybeUpdate(task, System.nanoTime())
override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
// TODO: can this really happen?
if (event.taskInfo == null) {
val now = System.nanoTime()
val metricsDelta = liveTasks.remove(event.taskInfo.taskId).map { task => = event.taskInfo
val errorMessage = event.reason match {
case Success =>
case k: TaskKilled =>
case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates
case e: TaskFailedReason => // All other failure cases
case other =>
logInfo(s"Unhandled task end reason: $other")
task.errorMessage = errorMessage
val delta = task.updateMetrics(event.taskMetrics)
update(task, now, last = true)
val (completedDelta, failedDelta, killedDelta) = event.reason match {
case Success =>
(1, 0, 0)
case _: TaskKilled =>
(0, 0, 1)
case _: TaskCommitDenied =>
(0, 0, 1)
case _ =>
(0, 1, 0)
Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
if (metricsDelta != null) {
stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, metricsDelta)
stage.activeTasks -= 1
stage.completedTasks += completedDelta
if (completedDelta > 0) {
stage.failedTasks += failedDelta
stage.killedTasks += killedDelta
if (killedDelta > 0) {
stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary)
stage.activeTasksPerExecutor(event.taskInfo.executorId) -= 1
// [SPARK-24415] Wait for all tasks to finish before removing stage from live list
val removeStage =
stage.activeTasks == 0 &&
(v1.StageStatus.COMPLETE.equals(stage.status) ||
if (removeStage) {
update(stage, now, last = true)
} else {
maybeUpdate(stage, now)
// Store both stage ID and task index in a single long variable for tracking at job level.
val taskIndex = (event.stageId.toLong << Integer.SIZE) | event.taskInfo.index { job =>
job.activeTasks -= 1
job.completedTasks += completedDelta
if (completedDelta > 0) {
job.failedTasks += failedDelta
job.killedTasks += killedDelta
if (killedDelta > 0) {
job.killedSummary = killedTasksSummary(event.reason, job.killedSummary)
if (removeStage) {
update(job, now)
} else {
maybeUpdate(job, now)
val esummary = stage.executorSummary(event.taskInfo.executorId)
esummary.taskTime += event.taskInfo.duration
esummary.succeededTasks += completedDelta
esummary.failedTasks += failedDelta
esummary.killedTasks += killedDelta
if (metricsDelta != null) {
esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta)
val isLastTask = stage.activeTasksPerExecutor(event.taskInfo.executorId) == 0
// If the last task of the executor finished, then update the esummary
// for both live and history events.
if (isLastTask) {
update(esummary, now)
} else {
maybeUpdate(esummary, now)
if (! && stage.savedTasks.get() > maxTasksPerStage) { = true
kvstore.doAsync {
if (removeStage) {
liveStages.remove((event.stageId, event.stageAttemptId))
liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
exec.activeTasks -= 1
exec.completedTasks += completedDelta
exec.failedTasks += failedDelta
exec.totalDuration += event.taskInfo.duration
// Note: For resubmitted tasks, we continue to use the metrics that belong to the
// first attempt of this task. This may not be 100% accurate because the first attempt
// could have failed half-way through. The correct fix would be to keep track of the
// metrics added by each attempt, but this is much more complicated.
if (event.reason != Resubmitted) {
if (event.taskMetrics != null) {
val readMetrics = event.taskMetrics.shuffleReadMetrics
exec.totalGcTime += event.taskMetrics.jvmGCTime
exec.totalInputBytes += event.taskMetrics.inputMetrics.bytesRead
exec.totalShuffleRead += readMetrics.localBytesRead + readMetrics.remoteBytesRead
exec.totalShuffleWrite += event.taskMetrics.shuffleWriteMetrics.bytesWritten
// Force an update on both live and history applications when the number of active tasks
// reaches 0. This is checked in some tests (e.g. SQLTestUtilsBase) so it needs to be
// reliably up to date.
if (exec.activeTasks == 0) {
update(exec, now)
} else {
maybeUpdate(exec, now)
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
val maybeStage =
Option(liveStages.get((event.stageInfo.stageId, event.stageInfo.attemptNumber)))
maybeStage.foreach { stage =>
val now = System.nanoTime() = event.stageInfo
// We have to update the stage status AFTER we create all the executorSummaries
// because stage deletion deletes whatever summaries it finds when the status is completed.
stage.executorSummaries.values.foreach(update(_, now))
// Because of SPARK-20205, old event logs may contain valid stages without a submission time
// in their start event. In those cases, we can only detect whether a stage was skipped by
// waiting until the completion event, at which point the field would have been set.
stage.status = event.stageInfo.failureReason match {
case Some(_) => v1.StageStatus.FAILED
case _ if event.stageInfo.submissionTime.isDefined => v1.StageStatus.COMPLETE
case _ => v1.StageStatus.SKIPPED
} { job =>
stage.status match {
case v1.StageStatus.COMPLETE =>
job.completedStages += event.stageInfo.stageId
case v1.StageStatus.SKIPPED =>
job.skippedStages += event.stageInfo.stageId
job.skippedTasks += event.stageInfo.numTasks
case _ =>
job.failedStages += 1
job.activeStages -= 1
liveUpdate(job, now)
pools.get(stage.schedulingPool).foreach { pool =>
pool.stageIds = pool.stageIds - event.stageInfo.stageId
update(pool, now)
val executorIdsForStage = stage.blackListedExecutors
executorIdsForStage.foreach { executorId =>
liveExecutors.get(executorId).foreach { exec =>
removeBlackListedStageFrom(exec, event.stageInfo.stageId, now)
// Remove stage only if there are no active tasks remaining
val removeStage = stage.activeTasks == 0
update(stage, now, last = removeStage)
if (removeStage) {
liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber))
if (stage.status == v1.StageStatus.COMPLETE) {
appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
private def removeBlackListedStageFrom(exec: LiveExecutor, stageId: Int, now: Long) = {
exec.blacklistedInStages -= stageId
liveUpdate(exec, now)
override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = {
// This needs to set fields that are already set by onExecutorAdded because the driver is
// considered an "executor" in the UI, but does not have a SparkListenerExecutorAdded event.
val exec = getOrCreateExecutor(event.blockManagerId.executorId, event.time)
exec.hostPort = event.blockManagerId.hostPort
event.maxOnHeapMem.foreach { _ =>
exec.totalOnHeap = event.maxOnHeapMem.get
exec.totalOffHeap = event.maxOffHeapMem.get
exec.isActive = true
exec.maxMemory = event.maxMem
liveUpdate(exec, System.nanoTime())
override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = {
// Nothing to do here. Covered by onExecutorRemoved.
override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
liveRDDs.remove(event.rddId).foreach { liveRDD =>
val storageLevel =
// Use RDD partition info to update executor block info.
liveRDD.getPartitions().foreach { case (_, part) =>
part.executors.foreach { executorId =>
liveExecutors.get(executorId).foreach { exec =>
exec.rddBlocks = exec.rddBlocks - 1
val now = System.nanoTime()
// Use RDD distribution to update executor memory and disk usage info.
liveRDD.getDistributions().foreach { case (executorId, rddDist) =>
liveExecutors.get(executorId).foreach { exec =>
if (exec.hasMemoryInfo) {
if (storageLevel.useOffHeap) {
exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, -rddDist.offHeapUsed)
} else {
exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, -rddDist.onHeapUsed)
exec.memoryUsed = addDeltaToValue(exec.memoryUsed, -rddDist.memoryUsed)
exec.diskUsed = addDeltaToValue(exec.diskUsed, -rddDist.diskUsed)
maybeUpdate(exec, now)
kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId)
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
val now = System.nanoTime()
event.accumUpdates.foreach { case (taskId, sid, sAttempt, accumUpdates) =>
liveTasks.get(taskId).foreach { task =>
val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates)
val delta = task.updateMetrics(metrics)
maybeUpdate(task, now)
Option(liveStages.get((sid, sAttempt))).foreach { stage =>
stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, delta)
maybeUpdate(stage, now)
val esummary = stage.executorSummary(event.execId)
esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, delta)
maybeUpdate(esummary, now)
// Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush
// here to ensure the staleness of Spark UI doesn't last more than
// `max(heartbeat interval, liveUpdateMinFlushPeriod)`.
if (now - lastFlushTimeNs > liveUpdateMinFlushPeriod) {
flush(maybeUpdate(_, now))
// Re-get the current system time because `flush` may be slow and `now` is stale.
lastFlushTimeNs = System.nanoTime()
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
event.blockUpdatedInfo.blockId match {
case block: RDDBlockId => updateRDDBlock(event, block)
case stream: StreamBlockId => updateStreamBlock(event, stream)
case broadcast: BroadcastBlockId => updateBroadcastBlock(event, broadcast)
case _ =>
/** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */
private def flush(entityFlushFunc: LiveEntity => Unit): Unit = {
liveStages.values.asScala.foreach { stage =>
* Shortcut to get active stages quickly in a live application, for use by the console
* progress bar.
def activeStages(): Seq[v1.StageData] = {
* Apply a delta to a value, but ensure that it doesn't go negative.
private def addDeltaToValue(old: Long, delta: Long): Long = math.max(0, old + delta)
private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = {
val now = System.nanoTime()
val executorId = event.blockUpdatedInfo.blockManagerId.executorId
// Whether values are being added to or removed from the existing accounting.
val storageLevel = event.blockUpdatedInfo.storageLevel
val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1)
val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1)
val updatedStorageLevel = if (storageLevel.isValid) {
} else {
// We need information about the executor to update some memory accounting values in the
// RDD info, so read that beforehand.
val maybeExec = liveExecutors.get(executorId)
var rddBlocksDelta = 0
// Update the executor stats first, since they are used to calculate the free memory
// on tracked RDD distributions.
maybeExec.foreach { exec =>
updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta)
// Update the block entry in the RDD info, keeping track of the deltas above so that we
// can update the executor information too.
liveRDDs.get(block.rddId).foreach { rdd =>
if (updatedStorageLevel.isDefined) {
val partition = rdd.partition(
val executors = if (updatedStorageLevel.isDefined) {
val current = partition.executors
if (current.contains(executorId)) {
} else {
rddBlocksDelta = 1
current :+ executorId
} else {
rddBlocksDelta = -1
partition.executors.filter(_ != executorId)
// Only update the partition if it's still stored in some executor, otherwise get rid of it.
if (executors.nonEmpty) {
partition.update(executors, rdd.storageLevel,
addDeltaToValue(partition.memoryUsed, memoryDelta),
addDeltaToValue(partition.diskUsed, diskDelta))
} else {
maybeExec.foreach { exec =>
if (exec.rddBlocks + rddBlocksDelta > 0) {
val dist = rdd.distribution(exec)
dist.memoryUsed = addDeltaToValue(dist.memoryUsed, memoryDelta)
dist.diskUsed = addDeltaToValue(dist.diskUsed, diskDelta)
if (exec.hasMemoryInfo) {
if (storageLevel.useOffHeap) {
dist.offHeapUsed = addDeltaToValue(dist.offHeapUsed, memoryDelta)
} else {
dist.onHeapUsed = addDeltaToValue(dist.onHeapUsed, memoryDelta)
dist.lastUpdate = null
} else {
// Trigger an update on other RDDs so that the free memory information is updated.
liveRDDs.values.foreach { otherRdd =>
if ( != block.rddId) {
otherRdd.distributionOpt(exec).foreach { dist =>
dist.lastUpdate = null
update(otherRdd, now)
rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed, memoryDelta)
rdd.diskUsed = addDeltaToValue(rdd.diskUsed, diskDelta)
update(rdd, now)
// Finish updating the executor now that we know the delta in the number of blocks.
maybeExec.foreach { exec =>
exec.rddBlocks += rddBlocksDelta
maybeUpdate(exec, now)
private def getOrCreateExecutor(executorId: String, addTime: Long): LiveExecutor = {
liveExecutors.getOrElseUpdate(executorId, {
activeExecutorCount += 1
new LiveExecutor(executorId, addTime)
private def updateStreamBlock(event: SparkListenerBlockUpdated, stream: StreamBlockId): Unit = {
val storageLevel = event.blockUpdatedInfo.storageLevel
if (storageLevel.isValid) {
val data = new StreamBlockData(,
} else {
Array(, event.blockUpdatedInfo.blockManagerId.executorId))
private def updateBroadcastBlock(
event: SparkListenerBlockUpdated,
broadcast: BroadcastBlockId): Unit = {
val executorId = event.blockUpdatedInfo.blockManagerId.executorId
liveExecutors.get(executorId).foreach { exec =>
val now = System.nanoTime()
val storageLevel = event.blockUpdatedInfo.storageLevel
// Whether values are being added to or removed from the existing accounting.
val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1)
val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1)
updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta)
maybeUpdate(exec, now)
private def updateExecutorMemoryDiskInfo(
exec: LiveExecutor,
storageLevel: StorageLevel,
memoryDelta: Long,
diskDelta: Long): Unit = {
if (exec.hasMemoryInfo) {
if (storageLevel.useOffHeap) {
exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta)
} else {
exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta)
exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta)
exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta)
private def getOrCreateStage(info: StageInfo): LiveStage = {
val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber),
new Function[(Int, Int), LiveStage]() {
override def apply(key: (Int, Int)): LiveStage = new LiveStage()
}) = info
private def killedTasksSummary(
reason: TaskEndReason,
oldSummary: Map[String, Int]): Map[String, Int] = {
reason match {
case k: TaskKilled =>
oldSummary.updated(k.reason, oldSummary.getOrElse(k.reason, 0) + 1)
case denied: TaskCommitDenied =>
val reason = denied.toErrorString
oldSummary.updated(reason, oldSummary.getOrElse(reason, 0) + 1)
case _ =>
private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = {
entity.write(kvstore, now, checkTriggers = last)
/** Update a live entity only if it hasn't been updated in the last configured period. */
private def maybeUpdate(entity: LiveEntity, now: Long): Unit = {
if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) {
update(entity, now)
/** Update an entity only if in a live app; avoids redundant writes when replaying logs. */
private def liveUpdate(entity: LiveEntity, now: Long): Unit = {
if (live) {
update(entity, now)
private def cleanupExecutors(count: Long): Unit = {
// Because the limit is on the number of *dead* executors, we need to calculate whether
// there are actually enough dead executors to be deleted.
val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS)
val dead = count - activeExecutorCount
if (dead > threshold) {
val countToDelete = calculateNumberToRemove(dead, threshold)
val toDelete = kvstore.view(classOf[ExecutorSummaryWrapper]).index("active")
toDelete.foreach { e => kvstore.delete(e.getClass(), }
private def cleanupJobs(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS))
if (countToDelete <= 0L) {
val view = kvstore.view(classOf[JobDataWrapper]).index("completionTime").first(0L)
val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j => != JobExecutionStatus.RUNNING && != JobExecutionStatus.UNKNOWN
toDelete.foreach { j => kvstore.delete(j.getClass(), }
private def cleanupStages(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES))
if (countToDelete <= 0L) {
// As the completion time of a skipped stage is always -1, we will remove skipped stages first.
// This is safe since the job itself contains enough information to render skipped stages in the
// UI.
val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime")
val stages = KVUtils.viewToSeq(view, countToDelete.toInt) { s => != v1.StageStatus.ACTIVE && != v1.StageStatus.PENDING
val stageIds = { s =>
val key = Array(,
kvstore.delete(s.getClass(), key)
// Check whether there are remaining attempts for the same stage. If there aren't, then
// also delete the RDD graph data.
val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
val hasMoreAttempts = try {
remainingAttempts.asScala.exists { other => !=
} finally {
if (!hasMoreAttempts) {
// Delete summaries in one pass, as deleting them for each stage is slow
kvstore.removeAllByIndexValues(classOf[ExecutorStageSummaryWrapper], "stage", stageIds)
// Delete tasks for all stages in one pass, as deleting them for each stage individually is slow
kvstore.removeAllByIndexValues(classOf[TaskDataWrapper], TaskIndexNames.STAGE, stageIds)
private def cleanupTasks(stage: LiveStage): Unit = {
val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt
if (countToDelete > 0) {
val stageKey = Array(,
val view = kvstore.view(classOf[TaskDataWrapper])
// Try to delete finished tasks only.
val toDelete = KVUtils.viewToSeq(view, countToDelete) { t =>
!live || t.status != TaskState.RUNNING.toString()
toDelete.foreach { t => kvstore.delete(t.getClass(), t.taskId) }
// If there are more running tasks than the configured limit, delete running tasks. This
// should be extremely rare since the limit should generally far exceed the number of tasks
// that can run in parallel.
val remaining = countToDelete - toDelete.size
if (remaining > 0) {
val runningTasksToDelete = view.max(remaining).iterator().asScala.toList
runningTasksToDelete.foreach { t => kvstore.delete(t.getClass(), t.taskId) }
// On live applications, cleanup any cached quantiles for the stage. This makes sure that
// quantiles will be recalculated after tasks are replaced with newer ones.
// This is not needed in the SHS since caching only happens after the event logs are
// completely processed.
if (live) {
} = false
private def cleanupCachedQuantiles(stageKey: Array[Int]): Unit = {
val cachedQuantiles = kvstore.view(classOf[CachedQuantile])
cachedQuantiles.foreach { q =>
* Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done
* asynchronously, this method may return 0 in case enough items have been deleted already.
private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): Long = {
if (dataSize > retainedSize) {
math.max(retainedSize / 10L, dataSize - retainedSize)
} else {