| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.shuffle.sort |
| |
| import java.util.concurrent.ConcurrentHashMap |
| |
| import scala.jdk.CollectionConverters._ |
| |
| import org.apache.spark._ |
| import org.apache.spark.internal.Logging |
| import org.apache.spark.shuffle._ |
| import org.apache.spark.shuffle.api.ShuffleExecutorComponents |
| import org.apache.spark.util.collection.OpenHashSet |
| |
| /** |
| * In sort-based shuffle, incoming records are sorted according to their target partition ids, then |
| * written to a single map output file. Reducers fetch contiguous regions of this file in order to |
| * read their portion of the map output. In cases where the map output data is too large to fit in |
| * memory, sorted subsets of the output can be spilled to disk and those on-disk files are merged |
| * to produce the final output file. |
| * |
| * Sort-based shuffle has two different write paths for producing its map output files: |
| * |
| * - Serialized sorting: used when all three of the following conditions hold: |
| * 1. The shuffle dependency specifies no map-side combine. |
| * 2. The shuffle serializer supports relocation of serialized values (this is currently |
| * supported by KryoSerializer and Spark SQL's custom serializers). |
| * 3. The shuffle produces fewer than or equal to 16777216 output partitions. |
| * - Deserialized sorting: used to handle all other cases. |
| * |
| * ----------------------- |
| * Serialized sorting mode |
| * ----------------------- |
| * |
| * In the serialized sorting mode, incoming records are serialized as soon as they are passed to the |
| * shuffle writer and are buffered in a serialized form during sorting. This write path implements |
| * several optimizations: |
| * |
| * - Its sort operates on serialized binary data rather than Java objects, which reduces memory |
| * consumption and GC overheads. This optimization requires the record serializer to have certain |
| * properties to allow serialized records to be re-ordered without requiring deserialization. |
| * See SPARK-4550, where this optimization was first proposed and implemented, for more details. |
| * |
| * - It uses a specialized cache-efficient sorter ([[ShuffleExternalSorter]]) that sorts |
| * arrays of compressed record pointers and partition ids. By using only 8 bytes of space per |
| * record in the sorting array, this fits more of the array into cache. |
| * |
| * - The spill merging procedure operates on blocks of serialized records that belong to the same |
| * partition and does not need to deserialize records during the merge. |
| * |
| * - When the spill compression codec supports concatenation of compressed data, the spill merge |
| * simply concatenates the serialized and compressed spill partitions to produce the final output |
| * partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used |
| * and avoids the need to allocate decompression or copying buffers during the merge. |
| * |
| * For more details on these optimizations, see SPARK-7081. |
| */ |
| private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { |
| |
| import SortShuffleManager._ |
| |
| if (!conf.getBoolean("spark.shuffle.spill", true)) { |
| logWarning( |
| "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." + |
| " Shuffle will continue to spill to disk when necessary.") |
| } |
| |
| /** |
| * A mapping from shuffle ids to the task ids of mappers producing output for those shuffles. |
| */ |
| private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, OpenHashSet[Long]]() |
| |
| private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf) |
| |
| override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) |
| |
| /** |
| * Obtains a [[ShuffleHandle]] to pass to tasks. |
| */ |
| override def registerShuffle[K, V, C]( |
| shuffleId: Int, |
| dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { |
| if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) { |
| // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't |
| // need map-side aggregation, then write numPartitions files directly and just concatenate |
| // them at the end. This avoids doing serialization and deserialization twice to merge |
| // together the spilled files, which would happen with the normal code path. The downside is |
| // having multiple files open at a time and thus more memory allocated to buffers. |
| new BypassMergeSortShuffleHandle[K, V]( |
| shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) |
| } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { |
| // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient: |
| new SerializedShuffleHandle[K, V]( |
| shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) |
| } else { |
| // Otherwise, buffer map outputs in a deserialized form: |
| new BaseShuffleHandle(shuffleId, dependency) |
| } |
| } |
| |
| /** |
| * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to |
| * read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive). |
| * If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length of total map |
| * outputs of the shuffle in `getMapSizesByExecutorId`. |
| * |
| * Called on executors by reduce tasks. |
| */ |
| override def getReader[K, C]( |
| handle: ShuffleHandle, |
| startMapIndex: Int, |
| endMapIndex: Int, |
| startPartition: Int, |
| endPartition: Int, |
| context: TaskContext, |
| metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { |
| val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]] |
| val (blocksByAddress, canEnableBatchFetch) = |
| if (baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked) { |
| val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId( |
| handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) |
| (res.iter, res.enableBatchFetch) |
| } else { |
| val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( |
| handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) |
| (address, true) |
| } |
| new BlockStoreShuffleReader( |
| handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, |
| shouldBatchFetch = |
| canEnableBatchFetch && canUseBatchFetch(startPartition, endPartition, context)) |
| } |
| |
| /** Get a writer for a given partition. Called on executors by map tasks. */ |
| override def getWriter[K, V]( |
| handle: ShuffleHandle, |
| mapId: Long, |
| context: TaskContext, |
| metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = { |
| val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent( |
| handle.shuffleId, _ => new OpenHashSet[Long](16)) |
| mapTaskIds.synchronized { mapTaskIds.add(mapId) } |
| val env = SparkEnv.get |
| handle match { |
| case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => |
| new UnsafeShuffleWriter( |
| env.blockManager, |
| context.taskMemoryManager(), |
| unsafeShuffleHandle, |
| mapId, |
| context, |
| env.conf, |
| metrics, |
| shuffleExecutorComponents) |
| case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => |
| new BypassMergeSortShuffleWriter( |
| env.blockManager, |
| bypassMergeSortHandle, |
| mapId, |
| env.conf, |
| metrics, |
| shuffleExecutorComponents) |
| case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => |
| new SortShuffleWriter(other, mapId, context, metrics, shuffleExecutorComponents) |
| } |
| } |
| |
| /** Remove a shuffle's metadata from the ShuffleManager. */ |
| override def unregisterShuffle(shuffleId: Int): Boolean = { |
| Option(taskIdMapsForShuffle.remove(shuffleId)).foreach { mapTaskIds => |
| mapTaskIds.iterator.foreach { mapTaskId => |
| shuffleBlockResolver.removeDataByMap(shuffleId, mapTaskId) |
| } |
| } |
| true |
| } |
| |
| /** Shut down this ShuffleManager. */ |
| override def stop(): Unit = { |
| shuffleBlockResolver.stop() |
| } |
| } |
| |
| |
| private[spark] object SortShuffleManager extends Logging { |
| |
| /** |
| * The maximum number of shuffle output partitions that SortShuffleManager supports when |
| * buffering map outputs in a serialized form. This is an extreme defensive programming measure, |
| * since it's extremely unlikely that a single shuffle produces over 16 million output partitions. |
| */ |
| val MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE = |
| PackedRecordPointer.MAXIMUM_PARTITION_ID + 1 |
| |
| /** |
| * The local property key for continuous shuffle block fetching feature. |
| */ |
| val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED_KEY = |
| "__fetch_continuous_blocks_in_batch_enabled" |
| |
| /** |
| * Helper method for determining whether a shuffle reader should fetch the continuous blocks |
| * in batch. |
| */ |
| def canUseBatchFetch(startPartition: Int, endPartition: Int, context: TaskContext): Boolean = { |
| val fetchMultiPartitions = endPartition - startPartition > 1 |
| fetchMultiPartitions && |
| context.getLocalProperty(FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED_KEY) == "true" |
| } |
| |
| /** |
| * Helper method for determining whether a shuffle should use an optimized serialized shuffle |
| * path or whether it should fall back to the original path that operates on deserialized objects. |
| */ |
| def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = { |
| val shufId = dependency.shuffleId |
| val numPartitions = dependency.partitioner.numPartitions |
| if (!dependency.serializer.supportsRelocationOfSerializedObjects) { |
| log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " + |
| s"${dependency.serializer.getClass.getName}, does not support object relocation") |
| false |
| } else if (dependency.mapSideCombine) { |
| log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " + |
| s"map-side aggregation") |
| false |
| } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { |
| log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " + |
| s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions") |
| false |
| } else { |
| log.debug(s"Can use serialized shuffle for shuffle $shufId") |
| true |
| } |
| } |
| |
| private def loadShuffleExecutorComponents(conf: SparkConf): ShuffleExecutorComponents = { |
| val executorComponents = ShuffleDataIOUtils.loadShuffleDataIO(conf).executor() |
| val extraConfigs = conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX) |
| .toMap |
| executorComponents.initializeExecutor( |
| conf.getAppId, |
| SparkEnv.get.executorId, |
| extraConfigs.asJava) |
| executorComponents |
| } |
| } |
| |
| /** |
| * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the |
| * serialized shuffle. |
| */ |
| private[spark] class SerializedShuffleHandle[K, V]( |
| shuffleId: Int, |
| dependency: ShuffleDependency[K, V, V]) |
| extends BaseShuffleHandle(shuffleId, dependency) { |
| } |
| |
| /** |
| * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the |
| * bypass merge sort shuffle path. |
| */ |
| private[spark] class BypassMergeSortShuffleHandle[K, V]( |
| shuffleId: Int, |
| dependency: ShuffleDependency[K, V, V]) |
| extends BaseShuffleHandle(shuffleId, dependency) { |
| } |