blob: 4d811b051de08aad566ece4160fc89a504d2e926 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle.sort
import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters._
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle._
import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.util.collection.OpenHashSet
/**
* In sort-based shuffle, incoming records are sorted according to their target partition ids, then
* written to a single map output file. Reducers fetch contiguous regions of this file in order to
* read their portion of the map output. In cases where the map output data is too large to fit in
* memory, sorted subsets of the output can be spilled to disk and those on-disk files are merged
* to produce the final output file.
*
* Sort-based shuffle has two different write paths for producing its map output files:
*
* - Serialized sorting: used when all three of the following conditions hold:
* 1. The shuffle dependency specifies no map-side combine.
* 2. The shuffle serializer supports relocation of serialized values (this is currently
* supported by KryoSerializer and Spark SQL's custom serializers).
* 3. The shuffle produces fewer than or equal to 16777216 output partitions.
* - Deserialized sorting: used to handle all other cases.
*
* -----------------------
* Serialized sorting mode
* -----------------------
*
* In the serialized sorting mode, incoming records are serialized as soon as they are passed to the
* shuffle writer and are buffered in a serialized form during sorting. This write path implements
* several optimizations:
*
* - Its sort operates on serialized binary data rather than Java objects, which reduces memory
* consumption and GC overheads. This optimization requires the record serializer to have certain
* properties to allow serialized records to be re-ordered without requiring deserialization.
* See SPARK-4550, where this optimization was first proposed and implemented, for more details.
*
* - It uses a specialized cache-efficient sorter ([[ShuffleExternalSorter]]) that sorts
* arrays of compressed record pointers and partition ids. By using only 8 bytes of space per
* record in the sorting array, this fits more of the array into cache.
*
* - The spill merging procedure operates on blocks of serialized records that belong to the same
* partition and does not need to deserialize records during the merge.
*
* - When the spill compression codec supports concatenation of compressed data, the spill merge
* simply concatenates the serialized and compressed spill partitions to produce the final output
* partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used
* and avoids the need to allocate decompression or copying buffers during the merge.
*
* For more details on these optimizations, see SPARK-7081.
*/
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
import SortShuffleManager._
if (!conf.getBoolean("spark.shuffle.spill", true)) {
logWarning(
"spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." +
" Shuffle will continue to spill to disk when necessary.")
}
/**
* A mapping from shuffle ids to the task ids of mappers producing output for those shuffles.
*/
private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, OpenHashSet[Long]]()
private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf)
override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
/**
* Obtains a [[ShuffleHandle]] to pass to tasks.
*/
override def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, dependency)
}
}
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to
* read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive).
* If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length of total map
* outputs of the shuffle in `getMapSizesByExecutorId`.
*
* Called on executors by reduce tasks.
*/
override def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
val (blocksByAddress, canEnableBatchFetch) =
if (baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked) {
val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
(res.iter, res.enableBatchFetch)
} else {
val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
(address, true)
}
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
shouldBatchFetch =
canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context))
}
/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
handle.shuffleId, _ => new OpenHashSet[Long](16))
mapTaskIds.synchronized { mapTaskIds.add(mapId) }
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf,
metrics,
shuffleExecutorComponents)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
bypassMergeSortHandle,
mapId,
env.conf,
metrics,
shuffleExecutorComponents)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(other, mapId, context, metrics, shuffleExecutorComponents)
}
}
/** Remove a shuffle's metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Boolean = {
Option(taskIdMapsForShuffle.remove(shuffleId)).foreach { mapTaskIds =>
mapTaskIds.iterator.foreach { mapTaskId =>
shuffleBlockResolver.removeDataByMap(shuffleId, mapTaskId)
}
}
true
}
/** Shut down this ShuffleManager. */
override def stop(): Unit = {
shuffleBlockResolver.stop()
}
}
private[spark] object SortShuffleManager extends Logging {
/**
* The maximum number of shuffle output partitions that SortShuffleManager supports when
* buffering map outputs in a serialized form. This is an extreme defensive programming measure,
* since it's extremely unlikely that a single shuffle produces over 16 million output partitions.
*/
val MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE =
PackedRecordPointer.MAXIMUM_PARTITION_ID + 1
/**
* The local property key for continuous shuffle block fetching feature.
*/
val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED_KEY =
"__fetch_continuous_blocks_in_batch_enabled"
/**
* Helper method for determining whether a shuffle reader should fetch the continuous blocks
* in batch.
*/
def canUseBatchFetch(startPartition: Int, endPartition: Int, context: TaskContext): Boolean = {
val fetchMultiPartitions = endPartition - startPartition > 1
fetchMultiPartitions &&
context.getLocalProperty(FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED_KEY) == "true"
}
/**
* Helper method for determining whether a shuffle should use an optimized serialized shuffle
* path or whether it should fall back to the original path that operates on deserialized objects.
*/
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
s"${dependency.serializer.getClass.getName}, does not support object relocation")
false
} else if (dependency.mapSideCombine) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +
s"map-side aggregation")
false
} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
false
} else {
log.debug(s"Can use serialized shuffle for shuffle $shufId")
true
}
}
private def loadShuffleExecutorComponents(conf: SparkConf): ShuffleExecutorComponents = {
val executorComponents = ShuffleDataIOUtils.loadShuffleDataIO(conf).executor()
val extraConfigs = conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX)
.toMap
executorComponents.initializeExecutor(
conf.getAppId,
SparkEnv.get.executorId,
extraConfigs.asJava)
executorComponents
}
}
/**
* Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the
* serialized shuffle.
*/
private[spark] class SerializedShuffleHandle[K, V](
shuffleId: Int,
dependency: ShuffleDependency[K, V, V])
extends BaseShuffleHandle(shuffleId, dependency) {
}
/**
* Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the
* bypass merge sort shuffle path.
*/
private[spark] class BypassMergeSortShuffleHandle[K, V](
shuffleId: Int,
dependency: ShuffleDependency[K, V, V])
extends BaseShuffleHandle(shuffleId, dependency) {
}