| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark |
| |
| import java.util.concurrent.TimeUnit |
| |
| import scala.collection.mutable |
| import scala.collection.mutable.ArrayBuffer |
| import scala.util.control.{ControlThrowable, NonFatal} |
| |
| import com.codahale.metrics.{Gauge, MetricRegistry} |
| |
| import org.apache.spark.internal.{config, Logging} |
| import org.apache.spark.internal.config._ |
| import org.apache.spark.metrics.source.Source |
| import org.apache.spark.scheduler._ |
| import org.apache.spark.storage.BlockManagerMaster |
| import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} |
| |
| /** |
| * An agent that dynamically allocates and removes executors based on the workload. |
| * |
| * The ExecutorAllocationManager maintains a moving target number of executors which is periodically |
| * synced to the cluster manager. The target starts at a configured initial value and changes with |
| * the number of pending and running tasks. |
| * |
| * Decreasing the target number of executors happens when the current target is more than needed to |
| * handle the current load. The target number of executors is always truncated to the number of |
| * executors that could run all current running and pending tasks at once. |
| * |
| * Increasing the target number of executors happens in response to backlogged tasks waiting to be |
| * scheduled. If the scheduler queue is not drained in M seconds, then new executors are added. If |
| * the queue persists for another N seconds, then more executors are added and so on. The number |
| * added in each round increases exponentially from the previous round until an upper bound has been |
| * reached. The upper bound is based both on a configured property and on the current number of |
| * running and pending tasks, as described above. |
| * |
| * The rationale for the exponential increase is twofold: (1) Executors should be added slowly |
| * in the beginning in case the number of extra executors needed turns out to be small. Otherwise, |
| * we may add more executors than we need just to remove them later. (2) Executors should be added |
| * quickly over time in case the maximum number of executors is very high. Otherwise, it will take |
| * a long time to ramp up under heavy workloads. |
| * |
| * The remove policy is simpler: If an executor has been idle for K seconds, meaning it has not |
| * been scheduled to run any tasks, then it is removed. |
| * |
| * There is no retry logic in either case because we make the assumption that the cluster manager |
| * will eventually fulfill all requests it receives asynchronously. |
| * |
| * The relevant Spark properties include the following: |
| * |
| * spark.dynamicAllocation.enabled - Whether this feature is enabled |
| * spark.dynamicAllocation.minExecutors - Lower bound on the number of executors |
| * spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors |
| * spark.dynamicAllocation.initialExecutors - Number of executors to start with |
| * |
| * spark.dynamicAllocation.executorAllocationRatio - |
| * This is used to reduce the parallelism of the dynamic allocation that can waste |
| * resources when tasks are small |
| * |
| * spark.dynamicAllocation.schedulerBacklogTimeout (M) - |
| * If there are backlogged tasks for this duration, add new executors |
| * |
| * spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) - |
| * If the backlog is sustained for this duration, add more executors |
| * This is used only after the initial backlog timeout is exceeded |
| * |
| * spark.dynamicAllocation.executorIdleTimeout (K) - |
| * If an executor has been idle for this duration, remove it |
| */ |
| private[spark] class ExecutorAllocationManager( |
| client: ExecutorAllocationClient, |
| listenerBus: LiveListenerBus, |
| conf: SparkConf, |
| blockManagerMaster: BlockManagerMaster) |
| extends Logging { |
| |
| allocationManager => |
| |
| import ExecutorAllocationManager._ |
| |
| // Lower and upper bounds on the number of executors. |
| private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) |
| private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) |
| private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) |
| |
| // How long there must be backlogged tasks for before an addition is triggered (seconds) |
| private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds( |
| "spark.dynamicAllocation.schedulerBacklogTimeout", "1s") |
| |
| // Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded |
| private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds( |
| "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s") |
| |
| // How long an executor must be idle for before it is removed (seconds) |
| private val executorIdleTimeoutS = conf.getTimeAsSeconds( |
| "spark.dynamicAllocation.executorIdleTimeout", "60s") |
| |
| private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds( |
| "spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${Integer.MAX_VALUE}s") |
| |
| // During testing, the methods to actually kill and add executors are mocked out |
| private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) |
| |
| // TODO: The default value of 1 for spark.executor.cores works right now because dynamic |
| // allocation is only supported for YARN and the default number of cores per executor in YARN is |
| // 1, but it might need to be attained differently for different cluster managers |
| private val tasksPerExecutorForFullParallelism = |
| conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1) |
| |
| private val executorAllocationRatio = |
| conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO) |
| |
| validateSettings() |
| |
| // Number of executors to add in the next round |
| private var numExecutorsToAdd = 1 |
| |
| // The desired number of executors at this moment in time. If all our executors were to die, this |
| // is the number of executors we would immediately want from the cluster manager. |
| private var numExecutorsTarget = initialNumExecutors |
| |
| // Executors that have been requested to be removed but have not been killed yet |
| private val executorsPendingToRemove = new mutable.HashSet[String] |
| |
| // All known executors |
| private val executorIds = new mutable.HashSet[String] |
| |
| // A timestamp of when an addition should be triggered, or NOT_SET if it is not set |
| // This is set when pending tasks are added but not scheduled yet |
| private var addTime: Long = NOT_SET |
| |
| // A timestamp for each executor of when the executor should be removed, indexed by the ID |
| // This is set when an executor is no longer running a task, or when it first registers |
| private val removeTimes = new mutable.HashMap[String, Long] |
| |
| // Polling loop interval (ms) |
| private val intervalMillis: Long = if (Utils.isTesting) { |
| conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100) |
| } else { |
| 100 |
| } |
| |
| // Clock used to schedule when executors should be added and removed |
| private var clock: Clock = new SystemClock() |
| |
| // Listener for Spark events that impact the allocation policy |
| val listener = new ExecutorAllocationListener |
| |
| // Executor that handles the scheduling task. |
| private val executor = |
| ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation") |
| |
| // Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem. |
| val executorAllocationManagerSource = new ExecutorAllocationManagerSource |
| |
| // Whether we are still waiting for the initial set of executors to be allocated. |
| // While this is true, we will not cancel outstanding executor requests. This is |
| // set to false when: |
| // (1) a stage is submitted, or |
| // (2) an executor idle timeout has elapsed. |
| @volatile private var initializing: Boolean = true |
| |
| // Number of locality aware tasks, used for executor placement. |
| private var localityAwareTasks = 0 |
| |
| // Host to possible task running on it, used for executor placement. |
| private var hostToLocalTaskCount: Map[String, Int] = Map.empty |
| |
| /** |
| * Verify that the settings specified through the config are valid. |
| * If not, throw an appropriate exception. |
| */ |
| private def validateSettings(): Unit = { |
| if (minNumExecutors < 0 || maxNumExecutors < 0) { |
| throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be positive!") |
| } |
| if (maxNumExecutors == 0) { |
| throw new SparkException("spark.dynamicAllocation.maxExecutors cannot be 0!") |
| } |
| if (minNumExecutors > maxNumExecutors) { |
| throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " + |
| s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!") |
| } |
| if (schedulerBacklogTimeoutS <= 0) { |
| throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!") |
| } |
| if (sustainedSchedulerBacklogTimeoutS <= 0) { |
| throw new SparkException( |
| "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!") |
| } |
| if (executorIdleTimeoutS < 0) { |
| throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be >= 0!") |
| } |
| if (cachedExecutorIdleTimeoutS < 0) { |
| throw new SparkException("spark.dynamicAllocation.cachedExecutorIdleTimeout must be >= 0!") |
| } |
| // Require external shuffle service for dynamic allocation |
| // Otherwise, we may lose shuffle files when killing executors |
| if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) { |
| throw new SparkException("Dynamic allocation of executors requires the external " + |
| "shuffle service. You may enable this through spark.shuffle.service.enabled.") |
| } |
| if (tasksPerExecutorForFullParallelism == 0) { |
| throw new SparkException("spark.executor.cores must not be < spark.task.cpus.") |
| } |
| |
| if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) { |
| throw new SparkException( |
| "spark.dynamicAllocation.executorAllocationRatio must be > 0 and <= 1.0") |
| } |
| } |
| |
| /** |
| * Use a different clock for this allocation manager. This is mainly used for testing. |
| */ |
| def setClock(newClock: Clock): Unit = { |
| clock = newClock |
| } |
| |
| /** |
| * Register for scheduler callbacks to decide when to add and remove executors, and start |
| * the scheduling task. |
| */ |
| def start(): Unit = { |
| listenerBus.addToManagementQueue(listener) |
| |
| val scheduleTask = new Runnable() { |
| override def run(): Unit = { |
| try { |
| schedule() |
| } catch { |
| case ct: ControlThrowable => |
| throw ct |
| case t: Throwable => |
| logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) |
| } |
| } |
| } |
| executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) |
| |
| client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) |
| } |
| |
| /** |
| * Stop the allocation manager. |
| */ |
| def stop(): Unit = { |
| executor.shutdown() |
| executor.awaitTermination(10, TimeUnit.SECONDS) |
| } |
| |
| /** |
| * Reset the allocation manager when the cluster manager loses track of the driver's state. |
| * This is currently only done in YARN client mode, when the AM is restarted. |
| * |
| * This method forgets about any state about existing executors, and forces the scheduler to |
| * re-evaluate the number of needed executors the next time it's run. |
| */ |
| def reset(): Unit = synchronized { |
| addTime = 0L |
| numExecutorsTarget = initialNumExecutors |
| executorsPendingToRemove.clear() |
| removeTimes.clear() |
| } |
| |
| /** |
| * The maximum number of executors we would need under the current load to satisfy all running |
| * and pending tasks, rounded up. |
| */ |
| private def maxNumExecutorsNeeded(): Int = { |
| val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks |
| math.ceil(numRunningOrPendingTasks * executorAllocationRatio / |
| tasksPerExecutorForFullParallelism) |
| .toInt |
| } |
| |
| private def totalRunningTasks(): Int = synchronized { |
| listener.totalRunningTasks |
| } |
| |
| /** |
| * This is called at a fixed interval to regulate the number of pending executor requests |
| * and number of executors running. |
| * |
| * First, adjust our requested executors based on the add time and our current needs. |
| * Then, if the remove time for an existing executor has expired, kill the executor. |
| * |
| * This is factored out into its own method for testing. |
| */ |
| private def schedule(): Unit = synchronized { |
| val now = clock.getTimeMillis |
| |
| val executorIdsToBeRemoved = ArrayBuffer[String]() |
| removeTimes.retain { case (executorId, expireTime) => |
| val expired = now >= expireTime |
| if (expired) { |
| initializing = false |
| executorIdsToBeRemoved += executorId |
| } |
| !expired |
| } |
| // Update executor target number only after initializing flag is unset |
| updateAndSyncNumExecutorsTarget(now) |
| if (executorIdsToBeRemoved.nonEmpty) { |
| removeExecutors(executorIdsToBeRemoved) |
| } |
| } |
| |
| /** |
| * Updates our target number of executors and syncs the result with the cluster manager. |
| * |
| * Check to see whether our existing allocation and the requests we've made previously exceed our |
| * current needs. If so, truncate our target and let the cluster manager know so that it can |
| * cancel pending requests that are unneeded. |
| * |
| * If not, and the add time has expired, see if we can request new executors and refresh the add |
| * time. |
| * |
| * @return the delta in the target number of executors. |
| */ |
| private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized { |
| val maxNeeded = maxNumExecutorsNeeded |
| |
| if (initializing) { |
| // Do not change our target while we are still initializing, |
| // Otherwise the first job may have to ramp up unnecessarily |
| 0 |
| } else if (maxNeeded < numExecutorsTarget) { |
| // The target number exceeds the number we actually need, so stop adding new |
| // executors and inform the cluster manager to cancel the extra pending requests |
| val oldNumExecutorsTarget = numExecutorsTarget |
| numExecutorsTarget = math.max(maxNeeded, minNumExecutors) |
| numExecutorsToAdd = 1 |
| |
| // If the new target has not changed, avoid sending a message to the cluster manager |
| if (numExecutorsTarget < oldNumExecutorsTarget) { |
| // We lower the target number of executors but don't actively kill any yet. Killing is |
| // controlled separately by an idle timeout. It's still helpful to reduce the target number |
| // in case an executor just happens to get lost (eg., bad hardware, or the cluster manager |
| // preempts it) -- in that case, there is no point in trying to immediately get a new |
| // executor, since we wouldn't even use it yet. |
| client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) |
| logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " + |
| s"$oldNumExecutorsTarget) because not all requested executors are actually needed") |
| } |
| numExecutorsTarget - oldNumExecutorsTarget |
| } else if (addTime != NOT_SET && now >= addTime) { |
| val delta = addExecutors(maxNeeded) |
| logDebug(s"Starting timer to add more executors (to " + |
| s"expire in $sustainedSchedulerBacklogTimeoutS seconds)") |
| addTime = now + (sustainedSchedulerBacklogTimeoutS * 1000) |
| delta |
| } else { |
| 0 |
| } |
| } |
| |
| /** |
| * Request a number of executors from the cluster manager. |
| * If the cap on the number of executors is reached, give up and reset the |
| * number of executors to add next round instead of continuing to double it. |
| * |
| * @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending |
| * tasks could fill |
| * @return the number of additional executors actually requested. |
| */ |
| private def addExecutors(maxNumExecutorsNeeded: Int): Int = { |
| // Do not request more executors if it would put our target over the upper bound |
| if (numExecutorsTarget >= maxNumExecutors) { |
| logDebug(s"Not adding executors because our current target total " + |
| s"is already $numExecutorsTarget (limit $maxNumExecutors)") |
| numExecutorsToAdd = 1 |
| return 0 |
| } |
| |
| val oldNumExecutorsTarget = numExecutorsTarget |
| // There's no point in wasting time ramping up to the number of executors we already have, so |
| // make sure our target is at least as much as our current allocation: |
| numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size) |
| // Boost our target with the number to add for this round: |
| numExecutorsTarget += numExecutorsToAdd |
| // Ensure that our target doesn't exceed what we need at the present moment: |
| numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded) |
| // Ensure that our target fits within configured bounds: |
| numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors) |
| |
| val delta = numExecutorsTarget - oldNumExecutorsTarget |
| |
| // If our target has not changed, do not send a message |
| // to the cluster manager and reset our exponential growth |
| if (delta == 0) { |
| // Check if there is any speculative jobs pending |
| if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 0) { |
| numExecutorsTarget = |
| math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), minNumExecutors) |
| } else { |
| numExecutorsToAdd = 1 |
| return 0 |
| } |
| } |
| |
| val addRequestAcknowledged = try { |
| testing || |
| client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) |
| } catch { |
| case NonFatal(e) => |
| // Use INFO level so the error it doesn't show up by default in shells. Errors here are more |
| // commonly caused by YARN AM restarts, which is a recoverable issue, and generate a lot of |
| // noisy output. |
| logInfo("Error reaching cluster manager.", e) |
| false |
| } |
| if (addRequestAcknowledged) { |
| val executorsString = "executor" + { if (delta > 1) "s" else "" } |
| logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" + |
| s" (new desired total will be $numExecutorsTarget)") |
| numExecutorsToAdd = if (delta == numExecutorsToAdd) { |
| numExecutorsToAdd * 2 |
| } else { |
| 1 |
| } |
| delta |
| } else { |
| logWarning( |
| s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!") |
| numExecutorsTarget = oldNumExecutorsTarget |
| 0 |
| } |
| } |
| |
| /** |
| * Request the cluster manager to remove the given executors. |
| * Returns the list of executors which are removed. |
| */ |
| private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized { |
| val executorIdsToBeRemoved = new ArrayBuffer[String] |
| |
| logInfo("Request to remove executorIds: " + executors.mkString(", ")) |
| val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size |
| |
| var newExecutorTotal = numExistingExecutors |
| executors.foreach { executorIdToBeRemoved => |
| if (newExecutorTotal - 1 < minNumExecutors) { |
| logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + |
| s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)") |
| } else if (newExecutorTotal - 1 < numExecutorsTarget) { |
| logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + |
| s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)") |
| } else if (canBeKilled(executorIdToBeRemoved)) { |
| executorIdsToBeRemoved += executorIdToBeRemoved |
| newExecutorTotal -= 1 |
| } |
| } |
| |
| if (executorIdsToBeRemoved.isEmpty) { |
| return Seq.empty[String] |
| } |
| |
| // Send a request to the backend to kill this executor(s) |
| val executorsRemoved = if (testing) { |
| executorIdsToBeRemoved |
| } else { |
| // We don't want to change our target number of executors, because we already did that |
| // when the task backlog decreased. |
| client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false, |
| countFailures = false, force = false) |
| } |
| // [SPARK-21834] killExecutors api reduces the target number of executors. |
| // So we need to update the target with desired value. |
| client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) |
| // reset the newExecutorTotal to the existing number of executors |
| newExecutorTotal = numExistingExecutors |
| if (testing || executorsRemoved.nonEmpty) { |
| executorsRemoved.foreach { removedExecutorId => |
| // If it is a cached block, it uses cachedExecutorIdleTimeoutS for timeout |
| val idleTimeout = if (blockManagerMaster.hasCachedBlocks(removedExecutorId)) { |
| cachedExecutorIdleTimeoutS |
| } else { |
| executorIdleTimeoutS |
| } |
| newExecutorTotal -= 1 |
| logInfo(s"Removing executor $removedExecutorId because it has been idle for " + |
| s"$idleTimeout seconds (new desired total will be $newExecutorTotal)") |
| executorsPendingToRemove.add(removedExecutorId) |
| } |
| executorsRemoved |
| } else { |
| logWarning(s"Unable to reach the cluster manager to kill executor/s " + |
| s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to kill!") |
| Seq.empty[String] |
| } |
| } |
| |
| /** |
| * Request the cluster manager to remove the given executor. |
| * Return whether the request is acknowledged. |
| */ |
| private def removeExecutor(executorId: String): Boolean = synchronized { |
| val executorsRemoved = removeExecutors(Seq(executorId)) |
| executorsRemoved.nonEmpty && executorsRemoved(0) == executorId |
| } |
| |
| /** |
| * Determine if the given executor can be killed. |
| */ |
| private def canBeKilled(executorId: String): Boolean = synchronized { |
| // Do not kill the executor if we are not aware of it (should never happen) |
| if (!executorIds.contains(executorId)) { |
| logWarning(s"Attempted to remove unknown executor $executorId!") |
| return false |
| } |
| |
| // Do not kill the executor again if it is already pending to be killed (should never happen) |
| if (executorsPendingToRemove.contains(executorId)) { |
| logWarning(s"Attempted to remove executor $executorId " + |
| s"when it is already pending to be removed!") |
| return false |
| } |
| |
| true |
| } |
| |
| /** |
| * Callback invoked when the specified executor has been added. |
| */ |
| private def onExecutorAdded(executorId: String): Unit = synchronized { |
| if (!executorIds.contains(executorId)) { |
| executorIds.add(executorId) |
| // If an executor (call this executor X) is not removed because the lower bound |
| // has been reached, it will no longer be marked as idle. When new executors join, |
| // however, we are no longer at the lower bound, and so we must mark executor X |
| // as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951) |
| executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle) |
| logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})") |
| } else { |
| logWarning(s"Duplicate executor $executorId has registered") |
| } |
| } |
| |
| /** |
| * Callback invoked when the specified executor has been removed. |
| */ |
| private def onExecutorRemoved(executorId: String): Unit = synchronized { |
| if (executorIds.contains(executorId)) { |
| executorIds.remove(executorId) |
| removeTimes.remove(executorId) |
| logInfo(s"Existing executor $executorId has been removed (new total is ${executorIds.size})") |
| if (executorsPendingToRemove.contains(executorId)) { |
| executorsPendingToRemove.remove(executorId) |
| logDebug(s"Executor $executorId is no longer pending to " + |
| s"be removed (${executorsPendingToRemove.size} left)") |
| } |
| } else { |
| logWarning(s"Unknown executor $executorId has been removed!") |
| } |
| } |
| |
| /** |
| * Callback invoked when the scheduler receives new pending tasks. |
| * This sets a time in the future that decides when executors should be added |
| * if it is not already set. |
| */ |
| private def onSchedulerBacklogged(): Unit = synchronized { |
| if (addTime == NOT_SET) { |
| logDebug(s"Starting timer to add executors because pending tasks " + |
| s"are building up (to expire in $schedulerBacklogTimeoutS seconds)") |
| addTime = clock.getTimeMillis + schedulerBacklogTimeoutS * 1000 |
| } |
| } |
| |
| /** |
| * Callback invoked when the scheduler queue is drained. |
| * This resets all variables used for adding executors. |
| */ |
| private def onSchedulerQueueEmpty(): Unit = synchronized { |
| logDebug("Clearing timer to add executors because there are no more pending tasks") |
| addTime = NOT_SET |
| numExecutorsToAdd = 1 |
| } |
| |
| /** |
| * Callback invoked when the specified executor is no longer running any tasks. |
| * This sets a time in the future that decides when this executor should be removed if |
| * the executor is not already marked as idle. |
| */ |
| private def onExecutorIdle(executorId: String): Unit = synchronized { |
| if (executorIds.contains(executorId)) { |
| if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) { |
| // Note that it is not necessary to query the executors since all the cached |
| // blocks we are concerned with are reported to the driver. Note that this |
| // does not include broadcast blocks. |
| val hasCachedBlocks = blockManagerMaster.hasCachedBlocks(executorId) |
| val now = clock.getTimeMillis() |
| val timeout = { |
| if (hasCachedBlocks) { |
| // Use a different timeout if the executor has cached blocks. |
| now + cachedExecutorIdleTimeoutS * 1000 |
| } else { |
| now + executorIdleTimeoutS * 1000 |
| } |
| } |
| val realTimeout = if (timeout <= 0) Long.MaxValue else timeout // overflow |
| removeTimes(executorId) = realTimeout |
| logDebug(s"Starting idle timer for $executorId because there are no more tasks " + |
| s"scheduled to run on the executor (to expire in ${(realTimeout - now)/1000} seconds)") |
| } |
| } else { |
| logWarning(s"Attempted to mark unknown executor $executorId idle") |
| } |
| } |
| |
| /** |
| * Callback invoked when the specified executor is now running a task. |
| * This resets all variables used for removing this executor. |
| */ |
| private def onExecutorBusy(executorId: String): Unit = synchronized { |
| logDebug(s"Clearing idle timer for $executorId because it is now running a task") |
| removeTimes.remove(executorId) |
| } |
| |
| /** |
| * A listener that notifies the given allocation manager of when to add and remove executors. |
| * |
| * This class is intentionally conservative in its assumptions about the relative ordering |
| * and consistency of events returned by the listener. |
| */ |
| private[spark] class ExecutorAllocationListener extends SparkListener { |
| |
| private val stageIdToNumTasks = new mutable.HashMap[Int, Int] |
| // Number of running tasks per stage including speculative tasks. |
| // Should be 0 when no stages are active. |
| private val stageIdToNumRunningTask = new mutable.HashMap[Int, Int] |
| private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]] |
| private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]] |
| // Number of speculative tasks to be scheduled in each stage |
| private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int] |
| // The speculative tasks started in each stage |
| private val stageIdToSpeculativeTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]] |
| |
| // stageId to tuple (the number of task with locality preferences, a map where each pair is a |
| // node and the number of tasks that would like to be scheduled on that node) map, |
| // maintain the executor placement hints for each stage Id used by resource framework to better |
| // place the executors. |
| private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] |
| |
| override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { |
| initializing = false |
| val stageId = stageSubmitted.stageInfo.stageId |
| val numTasks = stageSubmitted.stageInfo.numTasks |
| allocationManager.synchronized { |
| stageIdToNumTasks(stageId) = numTasks |
| stageIdToNumRunningTask(stageId) = 0 |
| allocationManager.onSchedulerBacklogged() |
| |
| // Compute the number of tasks requested by the stage on each host |
| var numTasksPending = 0 |
| val hostToLocalTaskCountPerStage = new mutable.HashMap[String, Int]() |
| stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality => |
| if (!locality.isEmpty) { |
| numTasksPending += 1 |
| locality.foreach { location => |
| val count = hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1 |
| hostToLocalTaskCountPerStage(location.host) = count |
| } |
| } |
| } |
| stageIdToExecutorPlacementHints.put(stageId, |
| (numTasksPending, hostToLocalTaskCountPerStage.toMap)) |
| |
| // Update the executor placement hints |
| updateExecutorPlacementHints() |
| } |
| } |
| |
| override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { |
| val stageId = stageCompleted.stageInfo.stageId |
| allocationManager.synchronized { |
| stageIdToNumTasks -= stageId |
| stageIdToNumRunningTask -= stageId |
| stageIdToNumSpeculativeTasks -= stageId |
| stageIdToTaskIndices -= stageId |
| stageIdToSpeculativeTaskIndices -= stageId |
| stageIdToExecutorPlacementHints -= stageId |
| |
| // Update the executor placement hints |
| updateExecutorPlacementHints() |
| |
| // If this is the last stage with pending tasks, mark the scheduler queue as empty |
| // This is needed in case the stage is aborted for any reason |
| if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) { |
| allocationManager.onSchedulerQueueEmpty() |
| } |
| } |
| } |
| |
| override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { |
| val stageId = taskStart.stageId |
| val taskId = taskStart.taskInfo.taskId |
| val taskIndex = taskStart.taskInfo.index |
| val executorId = taskStart.taskInfo.executorId |
| |
| allocationManager.synchronized { |
| if (stageIdToNumRunningTask.contains(stageId)) { |
| stageIdToNumRunningTask(stageId) += 1 |
| } |
| // This guards against the following race condition: |
| // 1. The `SparkListenerTaskStart` event is posted before the |
| // `SparkListenerExecutorAdded` event |
| // 2. The `SparkListenerExecutorRemoved` event is posted before the |
| // `SparkListenerTaskStart` event |
| // Above cases are possible because these events are posted in different threads. |
| // (see SPARK-4951 SPARK-26927) |
| if (!allocationManager.executorIds.contains(executorId) && |
| client.getExecutorIds().contains(executorId)) { |
| allocationManager.onExecutorAdded(executorId) |
| } |
| |
| // If this is the last pending task, mark the scheduler queue as empty |
| if (taskStart.taskInfo.speculative) { |
| stageIdToSpeculativeTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += |
| taskIndex |
| } else { |
| stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex |
| } |
| if (totalPendingTasks() == 0) { |
| allocationManager.onSchedulerQueueEmpty() |
| } |
| |
| // Mark the executor on which this task is scheduled as busy |
| executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId |
| allocationManager.onExecutorBusy(executorId) |
| } |
| } |
| |
| override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { |
| val executorId = taskEnd.taskInfo.executorId |
| val taskId = taskEnd.taskInfo.taskId |
| val taskIndex = taskEnd.taskInfo.index |
| val stageId = taskEnd.stageId |
| allocationManager.synchronized { |
| if (stageIdToNumRunningTask.contains(stageId)) { |
| stageIdToNumRunningTask(stageId) -= 1 |
| } |
| // If the executor is no longer running any scheduled tasks, mark it as idle |
| if (executorIdToTaskIds.contains(executorId)) { |
| executorIdToTaskIds(executorId) -= taskId |
| if (executorIdToTaskIds(executorId).isEmpty) { |
| executorIdToTaskIds -= executorId |
| allocationManager.onExecutorIdle(executorId) |
| } |
| } |
| |
| // If the task failed, we expect it to be resubmitted later. To ensure we have |
| // enough resources to run the resubmitted task, we need to mark the scheduler |
| // as backlogged again if it's not already marked as such (SPARK-8366) |
| if (taskEnd.reason != Success) { |
| if (totalPendingTasks() == 0) { |
| allocationManager.onSchedulerBacklogged() |
| } |
| if (taskEnd.taskInfo.speculative) { |
| stageIdToSpeculativeTaskIndices.get(stageId).foreach {_.remove(taskIndex)} |
| } else { |
| stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)} |
| } |
| } |
| } |
| } |
| |
| override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { |
| val executorId = executorAdded.executorId |
| if (executorId != SparkContext.DRIVER_IDENTIFIER) { |
| // This guards against the race condition in which the `SparkListenerTaskStart` |
| // event is posted before the `SparkListenerBlockManagerAdded` event, which is |
| // possible because these events are posted in different threads. (see SPARK-4951) |
| if (!allocationManager.executorIds.contains(executorId)) { |
| allocationManager.onExecutorAdded(executorId) |
| } |
| } |
| } |
| |
| override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { |
| allocationManager.onExecutorRemoved(executorRemoved.executorId) |
| } |
| |
| override def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted) |
| : Unit = { |
| val stageId = speculativeTask.stageId |
| |
| allocationManager.synchronized { |
| stageIdToNumSpeculativeTasks(stageId) = |
| stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1 |
| allocationManager.onSchedulerBacklogged() |
| } |
| } |
| |
| /** |
| * An estimate of the total number of pending tasks remaining for currently running stages. Does |
| * not account for tasks which may have failed and been resubmitted. |
| * |
| * Note: This is not thread-safe without the caller owning the `allocationManager` lock. |
| */ |
| def pendingTasks(): Int = { |
| stageIdToNumTasks.map { case (stageId, numTasks) => |
| numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0) |
| }.sum |
| } |
| |
| def pendingSpeculativeTasks(): Int = { |
| stageIdToNumSpeculativeTasks.map { case (stageId, numTasks) => |
| numTasks - stageIdToSpeculativeTaskIndices.get(stageId).map(_.size).getOrElse(0) |
| }.sum |
| } |
| |
| def totalPendingTasks(): Int = { |
| pendingTasks + pendingSpeculativeTasks |
| } |
| |
| /** |
| * The number of tasks currently running across all stages. |
| */ |
| def totalRunningTasks(): Int = { |
| stageIdToNumRunningTask.values.sum |
| } |
| |
| /** |
| * Return true if an executor is not currently running a task, and false otherwise. |
| * |
| * Note: This is not thread-safe without the caller owning the `allocationManager` lock. |
| */ |
| def isExecutorIdle(executorId: String): Boolean = { |
| !executorIdToTaskIds.contains(executorId) |
| } |
| |
| /** |
| * Update the Executor placement hints (the number of tasks with locality preferences, |
| * a map where each pair is a node and the number of tasks that would like to be scheduled |
| * on that node). |
| * |
| * These hints are updated when stages arrive and complete, so are not up-to-date at task |
| * granularity within stages. |
| */ |
| def updateExecutorPlacementHints(): Unit = { |
| var localityAwareTasks = 0 |
| val localityToCount = new mutable.HashMap[String, Int]() |
| stageIdToExecutorPlacementHints.values.foreach { case (numTasksPending, localities) => |
| localityAwareTasks += numTasksPending |
| localities.foreach { case (hostname, count) => |
| val updatedCount = localityToCount.getOrElse(hostname, 0) + count |
| localityToCount(hostname) = updatedCount |
| } |
| } |
| |
| allocationManager.localityAwareTasks = localityAwareTasks |
| allocationManager.hostToLocalTaskCount = localityToCount.toMap |
| } |
| } |
| |
| /** |
| * Metric source for ExecutorAllocationManager to expose its internal executor allocation |
| * status to MetricsSystem. |
| * Note: These metrics heavily rely on the internal implementation of |
| * ExecutorAllocationManager, metrics or value of metrics will be changed when internal |
| * implementation is changed, so these metrics are not stable across Spark version. |
| */ |
| private[spark] class ExecutorAllocationManagerSource extends Source { |
| val sourceName = "ExecutorAllocationManager" |
| val metricRegistry = new MetricRegistry() |
| |
| private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = { |
| metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] { |
| override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) } |
| }) |
| } |
| |
| registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0) |
| registerGauge("numberExecutorsPendingToRemove", executorsPendingToRemove.size, 0) |
| registerGauge("numberAllExecutors", executorIds.size, 0) |
| registerGauge("numberTargetExecutors", numExecutorsTarget, 0) |
| registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0) |
| } |
| } |
| |
| private object ExecutorAllocationManager { |
| val NOT_SET = Long.MaxValue |
| val TESTING_SCHEDULE_INTERVAL_KEY = "spark.testing.dynamicAllocation.scheduleInterval" |
| } |