blob: 3b7c7778e26ce2014d2e6492fbaad5b9d6095b5f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import java.util.concurrent.ScheduledFuture
import scala.reflect.ClassTag
import org.roaringbitmap.RoaringBitmap
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
* Base class for dependencies.
*/
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
/**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the child RDD depends on a small number
* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don't need it on the executor side.
*
* @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
* explicitly then the default serializer, as specified by `spark.serializer`
* config option, will be used.
* @param keyOrdering key ordering for RDD's shuffles
* @param aggregator map/reduce-side aggregator for RDD's shuffle
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
* @param shuffleWriterProcessor the processor to control the write behavior in ShuffleMapTask
*/
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false,
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
extends Dependency[Product2[K, V]] with Logging {
if (mapSideCombine) {
require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
}
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, this)
private[this] val numPartitions = rdd.partitions.length
// By default, shuffle merge is allowed for ShuffleDependency if push based shuffle
// is enabled
private[this] var _shuffleMergeAllowed = canShuffleMergeBeEnabled()
private[spark] def setShuffleMergeAllowed(shuffleMergeAllowed: Boolean): Unit = {
_shuffleMergeAllowed = shuffleMergeAllowed
}
def shuffleMergeEnabled : Boolean = shuffleMergeAllowed && mergerLocs.nonEmpty
def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed
/**
* Stores the location of the list of chosen external shuffle services for handling the
* shuffle merge requests from mappers in this shuffle map stage.
*/
private[spark] var mergerLocs: Seq[BlockManagerId] = Nil
/**
* Stores the information about whether the shuffle merge is finalized for the shuffle map stage
* associated with this shuffle dependency
*/
private[this] var _shuffleMergeFinalized: Boolean = false
/**
* shuffleMergeId is used to uniquely identify merging process of shuffle
* by an indeterminate stage attempt.
*/
private[this] var _shuffleMergeId: Int = 0
def shuffleMergeId: Int = _shuffleMergeId
def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
assert(shuffleMergeAllowed)
this.mergerLocs = mergerLocs
}
def getMergerLocs: Seq[BlockManagerId] = mergerLocs
private[spark] def markShuffleMergeFinalized(): Unit = {
_shuffleMergeFinalized = true
}
private[spark] def isShuffleMergeFinalizedMarked: Boolean = {
_shuffleMergeFinalized
}
/**
* Returns true if push-based shuffle is disabled or if the shuffle merge for
* this shuffle is finalized.
*/
def shuffleMergeFinalized: Boolean = {
if (shuffleMergeEnabled) {
isShuffleMergeFinalizedMarked
} else {
true
}
}
def newShuffleMergeState(): Unit = {
_shuffleMergeFinalized = false
mergerLocs = Nil
_shuffleMergeId += 1
finalizeTask = None
shufflePushCompleted.clear()
}
private def canShuffleMergeBeEnabled(): Boolean = {
val isPushShuffleEnabled = Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf,
// invoked at driver
isDriver = true)
if (isPushShuffleEnabled && rdd.isBarrier()) {
logWarning("Push-based shuffle is currently not supported for barrier stages")
}
isPushShuffleEnabled && numPartitions > 0 &&
// TODO: SPARK-35547: Push based shuffle is currently unsupported for Barrier stages
!rdd.isBarrier()
}
@transient private[this] val shufflePushCompleted = new RoaringBitmap()
/**
* Mark a given map task as push completed in the tracking bitmap.
* Using the bitmap ensures that the same map task launched multiple times due to
* either speculation or stage retry is only counted once.
* @param mapIndex Map task index
* @return number of map tasks with block push completed
*/
private[spark] def incPushCompleted(mapIndex: Int): Int = {
shufflePushCompleted.add(mapIndex)
shufflePushCompleted.getCardinality
}
// Only used by DAGScheduler to coordinate shuffle merge finalization
@transient private[this] var finalizeTask: Option[ScheduledFuture[_]] = None
private[spark] def getFinalizeTask: Option[ScheduledFuture[_]] = finalizeTask
private[spark] def setFinalizeTask(task: ScheduledFuture[_]): Unit = {
finalizeTask = Option(task)
}
// Set the threshold to 1 billion which leads to an 128MB bitmap and
// the actual size of `HighlyCompressedMapStatus` can be much larger than 128MB.
// This may crash the driver with an OOM error.
if (numPartitions.toLong * partitioner.numPartitions.toLong > (1L << 30)) {
logWarning(
log"The number of shuffle blocks " +
log"(${MDC(NUM_PARTITIONS, numPartitions.toLong * partitioner.numPartitions.toLong)})" +
log" for shuffleId ${MDC(SHUFFLE_ID, shuffleId)} " +
log"for ${MDC(RDD_DESCRIPTION, _rdd)} " +
log"with ${MDC(NUM_PARTITIONS2, numPartitions)} partitions" +
log" is possibly too large, which could cause the driver to crash with an out-of-memory" +
log" error. Consider decreasing the number of partitions in this shuffle stage."
)
}
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}