blob: c3431dca4bcf961e8af6ac9d0f7ce4674afba984 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index d21b9d9833..fe4507f81f 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -110,8 +110,10 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
def getMergerLocs: Seq[BlockManagerId] = mergerLocs
- _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
- _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
+ if (!_rdd.context.getConf.isRssEnable()) {
+ _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
+ _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index bdb768ed5a..ffde0643c5 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -208,7 +208,8 @@ private[spark] class ExecutorAllocationManager(
// decommissioning without a shuffle service.
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
(decommissionEnabled &&
- conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
+ conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) ||
+ conf.isRssEnable()) {
logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
} else if (!testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index f81d218278..ff729da76a 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -598,6 +598,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
Utils.redact(this, getAll).sorted.map { case (k, v) => k + "=" + v }.mkString("\n")
}
+ /**
+ * Return true if remote shuffle service is enabled.
+ */
+ def isRssEnable(): Boolean = get("spark.shuffle.manager", "sort").contains("RssShuffleManager")
}
private[spark] object SparkConf extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ddabb659b2..75eb6bc52f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -2039,7 +2039,8 @@ private[spark] class DAGScheduler(
// if the cluster manager explicitly tells us that the entire worker was lost, then
// we know to unregister shuffle output. (Note that "worker" specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the Worker.)
- val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled
+ val fileLost = (workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled) &&
+ !sc.getConf.isRssEnable()
removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index ad0791fa42..2d2ad52c71 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -973,7 +973,8 @@ private[spark] class TaskSetManager(
// and we are not using an external shuffle server which could serve the shuffle outputs.
// The reason is the next stage wouldn't be able to fetch the data from this dead executor
// so we would need to rerun these tasks on other executors.
- if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) {
+ if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled &&
+ !isZombie && !conf.isRssEnable()) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
// We may have a running task whose partition has been marked as successful,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
index ef84cd27a3..0b1ab0d50b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
@@ -159,19 +159,23 @@ class ShuffledRowRDD(
}
override def getPreferredLocations(partition: Partition): Seq[String] = {
- val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
- partition.asInstanceOf[ShuffledRowRDDPartition].spec match {
- case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
- // TODO order by partition size.
- startReducerIndex.until(endReducerIndex).flatMap { reducerIndex =>
- tracker.getPreferredLocationsForShuffle(dependency, reducerIndex)
- }
-
- case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex, _) =>
- tracker.getMapLocation(dependency, startMapIndex, endMapIndex)
-
- case PartialMapperPartitionSpec(mapIndex, _, _) =>
- tracker.getMapLocation(dependency, mapIndex, mapIndex + 1)
+ if (!conf.isRssEnable()) {
+ val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+ partition.asInstanceOf[ShuffledRowRDDPartition].spec match {
+ case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
+ // TODO order by partition size.
+ startReducerIndex.until(endReducerIndex).flatMap { reducerIndex =>
+ tracker.getPreferredLocationsForShuffle(dependency, reducerIndex)
+ }
+
+ case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex, _) =>
+ tracker.getMapLocation(dependency, startMapIndex, endMapIndex)
+
+ case PartialMapperPartitionSpec(mapIndex, _, _) =>
+ tracker.getMapLocation(dependency, mapIndex, mapIndex + 1)
+ }
+ } else {
+ Nil
}
}