blob: 43da5fee5d375ba861379f6f550a5c7ea175da57 [file] [log] [blame]
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index 9ea6d2fa2f..d6101cdcef 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -93,7 +93,9 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
- _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
+ if (!_rdd.context.getConf.isRssEnable()) {
+ _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 38bed797a0..a95d4c7d77 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -212,7 +212,7 @@ private[spark] class ExecutorAllocationManager(
}
// Require external shuffle service for dynamic allocation
// Otherwise, we may lose shuffle files when killing executors
- if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
+ if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing && !conf.isRssEnable()) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 9b8be58c79..f9437a21c0 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -629,6 +629,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
Utils.redact(this, getAll).sorted.map { case (k, v) => k + "=" + v }.mkString("\n")
}
+ /**
+ * Return true if remote shuffle service is enabled.
+ */
+ def isRssEnable(): Boolean = get("spark.shuffle.manager", "sort").contains("RssShuffleManager")
}
private[spark] object SparkConf extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 9d6a8faa89..df27576b66 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1742,7 +1742,7 @@ private[spark] class DAGScheduler(
// if the cluster manager explicitly tells us that the entire worker was lost, then
// we know to unregister shuffle output. (Note that "worker" specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the Worker.)
- val fileLost = workerLost || !env.blockManager.externalShuffleServiceEnabled
+ val fileLost = (workerLost || !env.blockManager.externalShuffleServiceEnabled) && !sc.getConf.isRssEnable()
removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 459f575ba7..f563368fa5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -975,7 +975,7 @@ private[spark] class TaskSetManager(
// The reason is the next stage wouldn't be able to fetch the data from this dead executor
// so we would need to rerun these tasks on other executors.
if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled
- && !isZombie) {
+ && !isZombie && !conf.isRssEnable()) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
// We may have a running task whose partition has been marked as successful,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
index 021ce2eac0..50c52b0091 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
@@ -149,9 +149,13 @@ class ShuffledRowRDD(
}
override def getPreferredLocations(partition: Partition): Seq[String] = {
- val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
- val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
- tracker.getPreferredLocationsForShuffle(dep, partition.index)
+ if (!conf.isRssEnable()) {
+ val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+ val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
+ tracker.getPreferredLocationsForShuffle(dep, partition.index)
+ } else {
+ Nil
+ }
}
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {