blob: be7082807182bcadda9485bd28ffe358cdc5fdd4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.storage
import java.io.IOException
import java.util.{HashMap => JHashMap}
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, TimeoutException}
import scala.jdk.CollectionConverters._
import scala.util.Random
import scala.util.control.NonFatal
import com.google.common.cache.CacheBuilder
import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.RDD_CACHE_VISIBILITY_TRACKING_ENABLED
import org.apache.spark.network.shuffle.{ExternalBlockStoreClient, RemoteBlockPushResolver}
import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}
import org.apache.spark.util.ArrayImplicits._
/**
* BlockManagerMasterEndpoint is an [[IsolatedThreadSafeRpcEndpoint]] on the master node to
* track statuses of all the storage endpoints' block managers.
*/
private[spark]
class BlockManagerMasterEndpoint(
override val rpcEnv: RpcEnv,
val isLocal: Boolean,
conf: SparkConf,
listenerBus: LiveListenerBus,
externalBlockStoreClient: Option[ExternalBlockStoreClient],
blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo],
mapOutputTracker: MapOutputTrackerMaster,
private val _shuffleManager: ShuffleManager,
isDriver: Boolean)
extends IsolatedThreadSafeRpcEndpoint with Logging {
// We initialize the ShuffleManager later in SparkContext and Executor, to allow
// user jars to define custom ShuffleManagers, as such `_shuffleManager` will be null here
// (except for tests) and we ask for the instance from the SparkEnv.
private lazy val shuffleManager = Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)
// Mapping from executor id to the block manager's local disk directories.
private val executorIdToLocalDirs =
CacheBuilder
.newBuilder()
.maximumSize(conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE))
.build[String, Array[String]]()
// Mapping from external shuffle service block manager id to the block statuses.
private val blockStatusByShuffleService =
new mutable.HashMap[BlockManagerId, BlockStatusPerBlockId]
// Mapping from executor ID to block manager ID.
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
// Set of block managers which are decommissioning
private val decommissioningBlockManagerSet = new mutable.HashSet[BlockManagerId]
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
// Mapping from task id to the set of rdd blocks which are generated from the task.
private val tidToRddBlockIds = new mutable.HashMap[Long, mutable.HashSet[RDDBlockId]]
// Record the RDD blocks which are not visible yet, a block will be removed from this collection
// after at least one task generating the block finishes successfully.
private val invisibleRDDBlocks = new mutable.HashSet[RDDBlockId]
// Mapping from host name to shuffle (mergers) services where the current app
// registered an executor in the past. Older hosts are removed when the
// maxRetainedMergerLocations size is reached in favor of newer locations.
private val shuffleMergerLocations = new mutable.LinkedHashMap[String, BlockManagerId]()
// Maximum number of merger locations to cache
private val maxRetainedMergerLocations = conf.get(config.SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS)
private val askThreadPool =
ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100)
private implicit val askExecutionContext: ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(askThreadPool)
private val topologyMapper = {
val topologyMapperClassName = conf.get(
config.STORAGE_REPLICATION_TOPOLOGY_MAPPER)
val clazz = Utils.classForName(topologyMapperClassName)
val mapper =
clazz.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[TopologyMapper]
logInfo(s"Using $topologyMapperClassName for getting topology information")
mapper
}
val proactivelyReplicate = conf.get(config.STORAGE_REPLICATION_PROACTIVE)
val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf)
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf, isDriver)
logInfo("BlockManagerMasterEndpoint up")
private val externalShuffleServiceRemoveShuffleEnabled: Boolean =
externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED)
private val externalShuffleServiceRddFetchEnabled: Boolean =
externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf)
private lazy val driverEndpoint =
RpcUtils.makeDriverRef(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, conf, rpcEnv)
/** Whether rdd cache visibility tracking is enabled. */
private val trackingCacheVisibility: Boolean = conf.get(RDD_CACHE_VISIBILITY_TRACKING_ENABLED)
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(
id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint, isReRegister) =>
context.reply(
register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint, isReRegister))
case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
@inline def handleResult(success: Boolean): Unit = {
// SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo
// returns false since the block info would be updated again later.
if (success) {
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
}
context.reply(success)
}
if (blockId.isShuffle) {
updateShuffleBlockInfo(blockId, blockManagerId).foreach(handleResult)
} else {
handleResult(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
}
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
case GetLocationsAndStatus(blockId, requesterHost) =>
context.reply(getLocationsAndStatus(blockId, requesterHost))
case GetLocationsMultipleBlockIds(blockIds) =>
context.reply(getLocationsMultipleBlockIds(blockIds))
case GetPeers(blockManagerId) =>
context.reply(getPeers(blockManagerId))
case GetExecutorEndpointRef(executorId) =>
context.reply(getExecutorEndpointRef(executorId))
case GetMemoryStatus =>
context.reply(memoryStatus)
case GetStorageStatus =>
context.reply(storageStatus)
case GetBlockStatus(blockId, askStorageEndpoints) =>
context.reply(blockStatus(blockId, askStorageEndpoints))
case GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter) =>
context.reply(getShufflePushMergerLocations(numMergersNeeded, hostsToFilter))
case RemoveShufflePushMergerLocation(host) =>
context.reply(removeShufflePushMergerLocation(host))
case IsExecutorAlive(executorId) =>
context.reply(blockManagerIdByExecutor.contains(executorId))
case GetMatchingBlockIds(filter, askStorageEndpoints) =>
context.reply(getMatchingBlockIds(filter, askStorageEndpoints))
case RemoveRdd(rddId) =>
context.reply(removeRdd(rddId))
case RemoveShuffle(shuffleId) =>
context.reply(removeShuffle(shuffleId))
case RemoveBroadcast(broadcastId, removeFromDriver) =>
context.reply(removeBroadcast(broadcastId, removeFromDriver))
case RemoveBlock(blockId) =>
removeBlockFromWorkers(blockId)
context.reply(true)
case RemoveExecutor(execId) =>
removeExecutor(execId)
context.reply(true)
case DecommissionBlockManagers(executorIds) =>
// Mark corresponding BlockManagers as being decommissioning by adding them to
// decommissioningBlockManagerSet, so they won't be used to replicate or migrate blocks.
// Note that BlockManagerStorageEndpoint will be notified about decommissioning when the
// executor is notified(see BlockManager.decommissionSelf), so we don't need to send the
// notification here.
val bms = executorIds.flatMap(blockManagerIdByExecutor.get)
logInfo(s"Mark BlockManagers (${bms.mkString(", ")}) as being decommissioning.")
decommissioningBlockManagerSet ++= bms
context.reply(true)
case GetReplicateInfoForRDDBlocks(blockManagerId) =>
context.reply(getReplicateInfoForRDDBlocks(blockManagerId))
case StopBlockManagerMaster =>
context.reply(true)
stop()
case UpdateRDDBlockTaskInfo(blockId, taskId) =>
// This is to report the information that a rdd block(with `blockId`) is computed
// and cached by task(with `taskId`). And this happens right after the task finished
// computing/caching the block only when the block is not visible yet. And the rdd
// block will be marked as visible when the corresponding task finished successfully.
context.reply(updateRDDBlockTaskInfo(blockId, taskId))
case GetRDDBlockVisibility(blockId) =>
// Get the visibility status of a specific rdd block.
context.reply(isRDDBlockVisible(blockId))
case UpdateRDDBlockVisibility(taskId, visible) =>
// This is to report the information that whether rdd blocks computed by task(with `taskId`)
// can be turned to be visible. This is reported by DAGScheduler right after task completes.
// If the task finished successfully, rdd blocks can be turned to be visible, otherwise rdd
// blocks' visibility status won't change.
context.reply(updateRDDBlockVisibility(taskId, visible))
}
private def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
if (trackingCacheVisibility) {
blockLocations.containsKey(blockId) &&
blockLocations.get(blockId).nonEmpty && !invisibleRDDBlocks.contains(blockId)
} else {
// Blocks should always be visible if the feature flag is disabled.
true
}
}
private def updateRDDBlockVisibility(taskId: Long, visible: Boolean): Unit = {
if (!trackingCacheVisibility) {
// Do nothing if the feature flag is disabled.
return
}
// TODO: When visible is false(the task had failed), we should be asking the block managers to
// evict the block since the results can be inconsistent if there is any indeterminate
// operation computing the rdd. Besides evicting the blocks here, when a rdd block is reported
// we may also need to check the data with existing replicas somehow.
// This will be tracked with jira: https://issues.apache.org/jira/browse/SPARK-42582
if (visible) {
tidToRddBlockIds.get(taskId).foreach { blockIds =>
blockIds.foreach { blockId =>
invisibleRDDBlocks.remove(blockId)
// Ask block managers to update the visibility status.
val msg = MarkRDDBlockAsVisible(blockId)
getLocations(blockId).flatMap(blockManagerInfo.get).foreach { managerInfo =>
managerInfo.storageEndpoint.ask[Unit](msg)
}
}
}
}
tidToRddBlockIds.remove(taskId)
}
private def updateRDDBlockTaskInfo(blockId: RDDBlockId, taskId: Long): Unit = {
if (!trackingCacheVisibility) {
// Do nothing if the feature flag is disabled.
return
}
tidToRddBlockIds.getOrElseUpdate(taskId, new mutable.HashSet[RDDBlockId])
.add(blockId)
}
/**
* A function that used to handle the failures when removing blocks. In general, the failure
* should be considered as non-fatal since it won't cause any correctness issue. Therefore,
* this function would prefer to log the exception and return the default value. We only throw
* the exception when there's a TimeoutException from an active executor, which implies the
* unhealthy status of the executor while the driver still not be aware of it.
* @param blockType should be one of "RDD", "shuffle", "broadcast", "block", used for log
* @param blockId the string value of a certain block id, used for log
* @param bmId the BlockManagerId of the BlockManager, where we're trying to remove the block
* @param defaultValue the return value of a failure removal. e.g., 0 means no blocks are removed
* @tparam T the generic type for defaultValue, Int or Boolean.
* @return the defaultValue or throw exception if the executor is active but reply late.
*/
private def handleBlockRemovalFailure[T](
blockType: String,
blockId: String,
bmId: BlockManagerId,
defaultValue: T): PartialFunction[Throwable, T] = {
case e: IOException =>
if (!SparkContext.getActive.map(_.isStopped).getOrElse(true)) {
logWarning(log"Error trying to remove ${MDC(BLOCK_TYPE, blockType)} " +
log"${MDC(BLOCK_ID, blockId)}" +
log" from block manager ${MDC(BLOCK_MANAGER_ID, bmId)}", e)
}
defaultValue
case t: TimeoutException =>
val executorId = bmId.executorId
val isAlive = try {
driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(executorId))
} catch {
// Ignore the non-fatal error from driverEndpoint since the caller doesn't really
// care about the return result of removing blocks. That way we avoid breaking
// down the whole application.
case NonFatal(e) =>
logError(log"Cannot determine whether executor " +
log"${MDC(EXECUTOR_ID, executorId)} is alive or not.", e)
false
}
if (!isAlive) {
logWarning(log"Error trying to remove ${MDC(BLOCK_TYPE, blockType)} " +
log"${MDC(BLOCK_ID, blockId)}. " +
log"The executor ${MDC(EXECUTOR_ID, executorId)} may have been lost.", t)
defaultValue
} else {
throw t
}
}
private def removeRdd(rddId: Int): Future[Seq[Int]] = {
// First remove the metadata for the given RDD, and then asynchronously remove the blocks
// from the storage endpoints.
// The message sent to the storage endpoints to remove the RDD
val removeMsg = RemoveRdd(rddId)
// Find all blocks for the given RDD, remove the block from both blockLocations and
// the blockManagerInfo that is tracking the blocks and create the futures which asynchronously
// remove the blocks from storage endpoints and gives back the number of removed blocks
val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
val blocksToDeleteByShuffleService =
new mutable.HashMap[BlockManagerId, mutable.HashSet[RDDBlockId]]
blocks.foreach { blockId =>
val bms: mutable.HashSet[BlockManagerId] = blockLocations.remove(blockId)
if (trackingCacheVisibility) {
invisibleRDDBlocks.remove(blockId)
}
val (bmIdsExtShuffle, bmIdsExecutor) = bms.partition(_.port == externalShuffleServicePort)
val liveExecutorsForBlock = bmIdsExecutor.map(_.executorId).toSet
bmIdsExtShuffle.foreach { bmIdForShuffleService =>
// if the original executor is already released then delete this disk block via
// the external shuffle service
if (!liveExecutorsForBlock.contains(bmIdForShuffleService.executorId)) {
val blockIdsToDel = blocksToDeleteByShuffleService.getOrElseUpdate(bmIdForShuffleService,
new mutable.HashSet[RDDBlockId]())
blockIdsToDel += blockId
blockStatusByShuffleService.get(bmIdForShuffleService).foreach { blockStatusForId =>
blockStatusForId.remove(blockId)
}
}
}
bmIdsExecutor.foreach { bmId =>
blockManagerInfo.get(bmId).foreach { bmInfo =>
bmInfo.removeBlock(blockId)
}
}
}
val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo =>
bmInfo.storageEndpoint.ask[Int](removeMsg).recover {
// use 0 as default value means no blocks were removed
handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0)
}
}.toSeq
val removeRddBlockViaExtShuffleServiceFutures = if (externalShuffleServiceRddFetchEnabled) {
externalBlockStoreClient.map { shuffleClient =>
blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
Future[Int] {
val numRemovedBlocks = shuffleClient.removeBlocks(
bmId.host,
bmId.port,
bmId.executorId,
blockIds.map(_.toString).toArray)
numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS)
}
}
}.getOrElse(Seq.empty)
} else {
Seq.empty
}
Future.sequence(removeRddFromExecutorsFutures ++ removeRddBlockViaExtShuffleServiceFutures)
}
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
// Find all shuffle blocks on executors that are no longer running
val blocksToDeleteByShuffleService =
new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
if (externalShuffleServiceRemoveShuffleEnabled) {
mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus =>
shuffleStatus.withMapStatuses { mapStatuses =>
mapStatuses.foreach { mapStatus =>
// Check if the executor has been deallocated
if (!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
val blocksToDel =
shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId)
if (blocksToDel.nonEmpty) {
val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location,
new mutable.HashSet[BlockId])
blocks ++= blocksToDel
}
}
}
}
}
}
val removeShuffleFromShuffleServicesFutures =
externalBlockStoreClient.map { shuffleClient =>
blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
Future[Boolean] {
val numRemovedBlocks = shuffleClient.removeBlocks(
bmId.host,
bmId.port,
bmId.executorId,
blockIds.map(_.toString).toArray)
numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS) == blockIds.size
}
}
}.getOrElse(Seq.empty)
val removeShuffleMergeFromShuffleServicesFutures =
externalBlockStoreClient.map { shuffleClient =>
val mergerLocations =
if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) {
mapOutputTracker.getShufflePushMergerLocations(shuffleId)
} else {
Seq.empty[BlockManagerId]
}
mergerLocations.map { bmId =>
Future[Boolean] {
shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId,
RemoteBlockPushResolver.DELETE_ALL_MERGED_SHUFFLE)
}
}
}.getOrElse(Seq.empty)
val removeMsg = RemoveShuffle(shuffleId)
val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
bm.storageEndpoint.ask[Boolean](removeMsg).recover {
// use false as default value means no shuffle data were removed
handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
}
}.toSeq
Future.sequence(removeShuffleFromExecutorsFutures ++
removeShuffleFromShuffleServicesFutures ++
removeShuffleMergeFromShuffleServicesFutures)
}
/**
* Delegate RemoveBroadcast messages to each BlockManager because the master may not notified
* of all broadcast blocks. If removeFromDriver is false, broadcast blocks are only removed
* from the executors, but not from the driver.
*/
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || !info.blockManagerId.isDriver
}
val futures = requiredBlockManagers.map { bm =>
bm.storageEndpoint.ask[Int](removeMsg).recover {
// use 0 as default value means no blocks were removed
handleBlockRemovalFailure("broadcast", broadcastId.toString, bm.blockManagerId, 0)
}
}.toSeq
Future.sequence(futures)
}
private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
val info = blockManagerInfo(blockManagerId)
// Remove the block manager from blockManagerIdByExecutor.
blockManagerIdByExecutor -= blockManagerId.executorId
decommissioningBlockManagerSet.remove(blockManagerId)
// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)
val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockLocations.get(blockId)
locations -= blockManagerId
// De-register the block if none of the block managers have it. Otherwise, if pro-active
// replication is enabled, and a block is either an RDD or a test block (the latter is used
// for unit testing), we send a message to a randomly chosen executor location to replicate
// the given block. Note that we ignore other block types (such as broadcast/shuffle blocks
// etc.) as replication doesn't make much sense in that context.
if (locations.isEmpty) {
blockLocations.remove(blockId)
logWarning(log"No more replicas available for ${MDC(BLOCK_ID, blockId)}!")
} else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
// As a heuristic, assume single executor failure to find out the number of replicas that
// existed before failure
val maxReplicas = locations.size + 1
val i = (new Random(blockId.hashCode)).nextInt(locations.size)
val blockLocations = locations.toSeq
val candidateBMId = blockLocations(i)
blockManagerInfo.get(candidateBMId).foreach { bm =>
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
bm.storageEndpoint.ask[Boolean](replicateMsg)
}
}
}
listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
logInfo(s"Removing block manager $blockManagerId")
}
private def addMergerLocation(blockManagerId: BlockManagerId): Unit = {
if (!blockManagerId.isDriver && !shuffleMergerLocations.contains(blockManagerId.host)) {
val shuffleServerId = BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER,
blockManagerId.host, externalShuffleServicePort)
if (shuffleMergerLocations.size >= maxRetainedMergerLocations) {
shuffleMergerLocations -= shuffleMergerLocations.head._1
}
shuffleMergerLocations(shuffleServerId.host) = shuffleServerId
}
}
private def removeExecutor(execId: String): Unit = {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
}
/**
* Returns a Seq of ReplicateBlock for each RDD block stored by given blockManagerId
* @param blockManagerId - block manager id for which ReplicateBlock info is needed
* @return Seq of ReplicateBlock
*/
private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
try {
val info = blockManagerInfo(blockManagerId)
val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
rddBlocks.map { blockId =>
val currentBlockLocations = blockLocations.get(blockId)
val maxReplicas = currentBlockLocations.size + 1
val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
replicateMsg
}.toSeq
} catch {
// If the block manager has already exited, nothing to replicate.
case _: java.util.NoSuchElementException =>
Seq.empty[ReplicateBlock]
}
}
// Remove a block from the workers that have it. This can only be used to remove
// blocks that the master knows about.
private def removeBlockFromWorkers(blockId: BlockId): Unit = {
val locations = blockLocations.get(blockId)
if (locations != null) {
locations.foreach { blockManagerId: BlockManagerId =>
val blockManager = blockManagerInfo.get(blockManagerId)
blockManager.foreach { bm =>
// Remove the block from the BlockManager.
// Doesn't actually wait for a confirmation and the message might get lost.
// If message loss becomes frequent, we should add retry logic here.
bm.storageEndpoint.ask[Boolean](RemoveBlock(blockId)).recover {
// use false as default value means no blocks were removed
handleBlockRemovalFailure("block", blockId.toString, bm.blockManagerId, false)
}
}
}
}
}
// Return a map from the block manager id to max memory and remaining memory.
private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
blockManagerInfo.map { case(blockManagerId, info) =>
(blockManagerId, (info.maxMem, info.remainingMem))
}.toMap
}
private def storageStatus: Array[StorageStatus] = {
blockManagerInfo.map { case (blockManagerId, info) =>
new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
Some(info.maxOffHeapMem), info.blocks.asScala)
}.toArray
}
/**
* Return the block's status for all block managers, if any. NOTE: This is a
* potentially expensive operation and should only be used for testing.
*
* If askStorageEndpoints is true, the master queries each block manager for the most updated
* block statuses. This is useful when the master is not informed of the given block by all block
* managers.
*/
private def blockStatus(
blockId: BlockId,
askStorageEndpoints: Boolean): Map[BlockManagerId, Future[Option[BlockStatus]]] = {
val getBlockStatus = GetBlockStatus(blockId)
/*
* Rather than blocking on the block status query, master endpoint should simply return
* Futures to avoid potential deadlocks. This can arise if there exists a block manager
* that is also waiting for this master endpoint's response to a previous message.
*/
blockManagerInfo.values.map { info =>
val blockStatusFuture =
if (askStorageEndpoints) {
info.storageEndpoint.ask[Option[BlockStatus]](getBlockStatus)
} else {
Future { info.getStatus(blockId) }
}
(info.blockManagerId, blockStatusFuture)
}.toMap
}
/**
* Return the ids of blocks present in all the block managers that match the given filter.
* NOTE: This is a potentially expensive operation and should only be used for testing.
*
* If askStorageEndpoints is true, the master queries each block manager for the most updated
* block statuses. This is useful when the master is not informed of the given block by all block
* managers.
*/
private def getMatchingBlockIds(
filter: BlockId => Boolean,
askStorageEndpoints: Boolean): Future[Seq[BlockId]] = {
val getMatchingBlockIds = GetMatchingBlockIds(filter)
Future.sequence(
blockManagerInfo.values.map { info =>
val future =
if (askStorageEndpoints) {
info.storageEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
} else {
Future { info.blocks.asScala.keys.filter(filter).toSeq }
}
future
}
).map(_.flatten.toSeq)
}
private def externalShuffleServiceIdOnHost(blockManagerId: BlockManagerId): BlockManagerId = {
// we need to keep the executor ID of the original executor to let the shuffle service know
// which local directories should be used to look for the file
BlockManagerId(blockManagerId.executorId, blockManagerId.host, externalShuffleServicePort)
}
/**
* Returns the BlockManagerId with topology information populated, if available.
*/
private def register(
idWithoutTopologyInfo: BlockManagerId,
localDirs: Array[String],
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
storageEndpoint: RpcEndpointRef,
isReRegister: Boolean): BlockManagerId = {
// the dummy id is not expected to contain the topology information.
// we get that info here and respond back with a more fleshed out block manager id
val id = BlockManagerId(
idWithoutTopologyInfo.executorId,
idWithoutTopologyInfo.host,
idWithoutTopologyInfo.port,
topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))
val time = System.currentTimeMillis()
executorIdToLocalDirs.put(id.executorId, localDirs)
// SPARK-41360: For the block manager re-registration, we should only allow it when
// the executor is recognized as active by the scheduler backend. Otherwise, this kind
// of re-registration from the terminating/stopped executor is meaningless and harmful.
lazy val isExecutorAlive =
driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(oldId) =>
// A block manager of the same executor already exists, so remove it (assumed dead)
logError(log"Got two different block manager registrations on same executor - "
+ log" will replace old one ${MDC(OLD_BLOCK_MANAGER_ID, oldId)} " +
log"with new one ${MDC(BLOCK_MANAGER_ID, id)}")
removeExecutor(id.executorId)
case None =>
}
logInfo("Registering block manager %s with %s RAM, %s".format(
id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id))
blockManagerIdByExecutor(id.executorId) = id
val externalShuffleServiceBlockStatus =
if (externalShuffleServiceRddFetchEnabled) {
// The blockStatusByShuffleService entries are never removed as they belong to the
// external shuffle service instances running on the cluster nodes. To decrease its
// memory footprint when all the disk persisted blocks are removed for a shuffle service
// BlockStatusPerBlockId releases the backing HashMap.
val externalShuffleServiceBlocks = blockStatusByShuffleService
.getOrElseUpdate(externalShuffleServiceIdOnHost(id), new BlockStatusPerBlockId)
Some(externalShuffleServiceBlocks)
} else {
None
}
blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(),
maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint, externalShuffleServiceBlockStatus)
if (pushBasedShuffleEnabled) {
addMergerLocation(id)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id,
maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
}
val updatedId = if (isReRegister && !isExecutorAlive) {
assert(!blockManagerInfo.contains(id),
"BlockManager re-registration shouldn't succeed when the executor is lost")
logInfo(s"BlockManager ($id) re-registration is rejected since " +
s"the executor (${id.executorId}) has been lost")
// Use "invalid" as the return executor id to indicate the block manager that
// re-registration failed. It's a bit hacky but fine since the returned block
// manager id won't be accessed in the case of re-registration. And we'll use
// this "invalid" executor id to print better logs and avoid blocks reporting.
BlockManagerId(
BlockManagerId.INVALID_EXECUTOR_ID,
id.host,
id.port,
id.topologyInfo)
} else {
id
}
updatedId
}
private def updateShuffleBlockInfo(blockId: BlockId, blockManagerId: BlockManagerId)
: Future[Boolean] = {
blockId match {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
// SPARK-36782: Invoke `MapOutputTracker.updateMapOutput` within the thread
// `dispatcher-BlockManagerMaster` could lead to the deadlock when
// `MapOutputTracker.serializeOutputStatuses` broadcasts the serialized mapstatues under
// the acquired write lock. The broadcast block would report its status to
// `BlockManagerMasterEndpoint`, while the `BlockManagerMasterEndpoint` is occupied by
// `updateMapOutput` since it's waiting for the write lock. Thus, we use `Future` to call
// `updateMapOutput` in a separate thread to avoid the deadlock.
Future {
// We need to update this at index file because there exists the index-only block
logDebug(s"Received shuffle index block update for ${shuffleId} ${mapId}, updating.")
mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId)
true
}
case ShuffleDataBlockId(shuffleId: Int, mapId: Long, _: Int) =>
logDebug(s"Received shuffle data block update for ${shuffleId} ${mapId}, ignore.")
Future.successful(true)
case _ =>
logDebug(s"Unexpected shuffle block type ${blockId}" +
s"as ${blockId.getClass().getSimpleName()}")
Future.successful(false)
}
}
private def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
logDebug(s"Updating block info on master ${blockId} for ${blockManagerId}")
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
return true
} else {
return false
}
}
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
return true
}
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
locations = blockLocations.get(blockId)
} else {
locations = new mutable.HashSet[BlockManagerId]
blockLocations.put(blockId, locations)
}
if (storageLevel.isValid) {
val firstBlock = locations.isEmpty
locations.add(blockManagerId)
blockId.asRDDId.foreach { rddBlockId =>
(trackingCacheVisibility, firstBlock) match {
case (true, true) =>
// Mark as invisible for the first block.
invisibleRDDBlocks.add(rddBlockId)
case (true, false) if !invisibleRDDBlocks.contains(rddBlockId) =>
// If the rdd block is already visible, ask storage manager to update the visibility
// status.
blockManagerInfo(blockManagerId).storageEndpoint
.ask[Unit](MarkRDDBlockAsVisible(rddBlockId))
case _ =>
}
}
} else {
locations.remove(blockManagerId)
}
if (blockId.isRDD && storageLevel.useDisk && externalShuffleServiceRddFetchEnabled) {
val externalShuffleServiceId = externalShuffleServiceIdOnHost(blockManagerId)
if (storageLevel.isValid) {
locations.add(externalShuffleServiceId)
} else {
locations.remove(externalShuffleServiceId)
}
}
// Remove the block from master tracking if it has been removed on all endpoints.
if (locations.isEmpty) {
blockLocations.remove(blockId)
}
true
}
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
}
private def getLocationsAndStatus(
blockId: BlockId,
requesterHost: String): Option[BlockLocationsAndStatus] = {
val locations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty)
val status = locations.headOption.flatMap { bmId =>
if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) {
blockStatusByShuffleService.get(bmId).flatMap(m => m.get(blockId))
} else {
blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId))
}
}
if (locations.nonEmpty && status.isDefined) {
val localDirs = locations.find { loc =>
// When the external shuffle service running on the same host is found among the block
// locations then the block must be persisted on the disk. In this case the executorId
// can be used to access this block even when the original executor is already stopped.
loc.host == requesterHost &&
(loc.port == externalShuffleServicePort ||
blockManagerInfo
.get(loc)
.flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk))
.getOrElse(false))
}.flatMap { bmId => Option(executorIdToLocalDirs.getIfPresent(bmId.executorId)) }
Some(BlockLocationsAndStatus(locations, status.get, localDirs))
} else {
None
}
}
private def getLocationsMultipleBlockIds(
blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
blockIds.map(blockId => getLocations(blockId)).toImmutableArraySeq
}
/** Get the list of the peers of the given block manager */
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = blockManagerInfo.keySet
if (blockManagerIds.contains(blockManagerId)) {
blockManagerIds
.filterNot { _.isDriver }
.filterNot { _ == blockManagerId }
.diff(decommissioningBlockManagerSet)
.toSeq
} else {
Seq.empty
}
}
private def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
val blockManagerHosts = blockManagerIdByExecutor
.filterNot(_._2.isDriver).values.map(_.host).toSet
val filteredBlockManagerHosts = blockManagerHosts.diff(hostsToFilter)
val filteredMergersWithExecutors = filteredBlockManagerHosts.map(
BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, _, externalShuffleServicePort))
// Enough mergers are available as part of active executors list
if (filteredMergersWithExecutors.size >= numMergersNeeded) {
filteredMergersWithExecutors.toSeq
} else {
// Delta mergers added from inactive mergers list to the active mergers list
val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host)
val filteredMergersWithoutExecutors = shuffleMergerLocations.values
.filterNot(x => hostsToFilter.contains(x.host))
.filterNot(x => filteredMergersWithExecutorsHosts.contains(x.host))
val randomFilteredMergersLocations =
if (filteredMergersWithoutExecutors.size >
numMergersNeeded - filteredMergersWithExecutors.size) {
Utils.randomize(filteredMergersWithoutExecutors)
.take(numMergersNeeded - filteredMergersWithExecutors.size)
} else {
filteredMergersWithoutExecutors
}
filteredMergersWithExecutors.toSeq ++ randomFilteredMergersLocations
}
}
private def removeShufflePushMergerLocation(host: String): Unit = {
if (shuffleMergerLocations.contains(host)) {
shuffleMergerLocations.remove(host)
}
}
/**
* Returns an [[RpcEndpointRef]] of the [[BlockManagerReplicaEndpoint]] for sending RPC messages.
*/
private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
for (
blockManagerId <- blockManagerIdByExecutor.get(executorId);
info <- blockManagerInfo.get(blockManagerId)
) yield {
info.storageEndpoint
}
}
override def onStop(): Unit = {
askThreadPool.shutdownNow()
}
}
@DeveloperApi
case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) {
def isCached: Boolean = memSize + diskSize > 0
}
@DeveloperApi
object BlockStatus {
def empty: BlockStatus = BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
}
/**
* Stores block statuses for block IDs but removes the reference to the Map which used for storing
* the data when all the blocks are removed to avoid keeping the memory when not needed.
*/
private[spark] class BlockStatusPerBlockId {
private var blocks: JHashMap[BlockId, BlockStatus] = _
def get(blockId: BlockId): Option[BlockStatus] =
if (blocks == null) None else Option(blocks.get(blockId))
def put(blockId: BlockId, blockStatus: BlockStatus): Unit = {
if (blocks == null) {
blocks = new JHashMap[BlockId, BlockStatus]
}
blocks.put(blockId, blockStatus)
}
def remove(blockId: BlockId): Unit = {
if (blocks != null) {
blocks.remove(blockId)
if (blocks.isEmpty) {
blocks = null
}
}
}
}
private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
val maxOnHeapMem: Long,
val maxOffHeapMem: Long,
val storageEndpoint: RpcEndpointRef,
val externalShuffleServiceBlockStatus: Option[BlockStatusPerBlockId])
extends Logging {
val maxMem = maxOnHeapMem + maxOffHeapMem
private var _lastSeenMs: Long = timeMs
private var _remainingMem: Long = maxMem
// Mapping from block id to its status.
private val _blocks = new JHashMap[BlockId, BlockStatus]
def getStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks.get(blockId))
def updateLastSeenMs(): Unit = {
_lastSeenMs = System.currentTimeMillis()
}
def updateBlockInfo(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Unit = {
updateLastSeenMs()
val blockExists = _blocks.containsKey(blockId)
var originalMemSize: Long = 0
var originalDiskSize: Long = 0
var originalLevel: StorageLevel = StorageLevel.NONE
if (blockExists) {
// The block exists on the storage endpoint already.
val blockStatus: BlockStatus = _blocks.get(blockId)
originalLevel = blockStatus.storageLevel
originalMemSize = blockStatus.memSize
originalDiskSize = blockStatus.diskSize
if (originalLevel.useMemory) {
_remainingMem += originalMemSize
}
}
if (storageLevel.isValid) {
/* isValid means it is either stored in-memory or on-disk.
* The memSize here indicates the data size in or dropped from memory,
* and the diskSize here indicates the data size in or dropped to disk.
* They can be both larger than 0, when a block is dropped from memory to disk.
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
var blockStatus: BlockStatus = null
if (storageLevel.useMemory) {
blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
_blocks.put(blockId, blockStatus)
_remainingMem -= memSize
if (blockExists) {
logInfo(s"Updated $blockId in memory on ${blockManagerId.hostPort}" +
s" (current size: ${Utils.bytesToString(memSize)}," +
s" original size: ${Utils.bytesToString(originalMemSize)}," +
s" free: ${Utils.bytesToString(_remainingMem)})")
} else {
logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" +
s" (size: ${Utils.bytesToString(memSize)}," +
s" free: ${Utils.bytesToString(_remainingMem)})")
}
}
if (storageLevel.useDisk) {
blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
_blocks.put(blockId, blockStatus)
if (blockExists) {
logInfo(s"Updated $blockId on disk on ${blockManagerId.hostPort}" +
s" (current size: ${Utils.bytesToString(diskSize)}," +
s" original size: ${Utils.bytesToString(originalDiskSize)})")
} else {
logInfo(s"Added $blockId on disk on ${blockManagerId.hostPort}" +
s" (size: ${Utils.bytesToString(diskSize)})")
}
}
externalShuffleServiceBlockStatus.foreach { shuffleServiceBlocks =>
if (!blockId.isBroadcast && blockStatus.diskSize > 0) {
shuffleServiceBlocks.put(blockId, blockStatus)
}
}
} else if (blockExists) {
// If isValid is not true, drop the block.
_blocks.remove(blockId)
externalShuffleServiceBlockStatus.foreach { blockStatus =>
blockStatus.remove(blockId)
}
if (originalLevel.useMemory) {
logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" +
s" (size: ${Utils.bytesToString(originalMemSize)}," +
s" free: ${Utils.bytesToString(_remainingMem)})")
}
if (originalLevel.useDisk) {
logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" +
s" (size: ${Utils.bytesToString(originalDiskSize)})")
}
}
}
def removeBlock(blockId: BlockId): Unit = {
if (_blocks.containsKey(blockId)) {
_remainingMem += _blocks.get(blockId).memSize
_blocks.remove(blockId)
externalShuffleServiceBlockStatus.foreach { blockStatus =>
blockStatus.remove(blockId)
}
}
}
def remainingMem: Long = _remainingMem
def lastSeenMs: Long = _lastSeenMs
def blocks: JHashMap[BlockId, BlockStatus] = _blocks
override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
def clear(): Unit = {
_blocks.clear()
}
}