| diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala |
| index 9ea6d2fa2f..d6101cdcef 100644 |
| --- a/core/src/main/scala/org/apache/spark/Dependency.scala |
| +++ b/core/src/main/scala/org/apache/spark/Dependency.scala |
| @@ -93,7 +93,9 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( |
| val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( |
| shuffleId, _rdd.partitions.length, this) |
| |
| - _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) |
| + if (!_rdd.context.getConf.isRssEnable()) { |
| + _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) |
| + } |
| } |
| |
| |
| diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala |
| index 38bed797a0..a95d4c7d77 100644 |
| --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala |
| +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala |
| @@ -212,7 +212,7 @@ private[spark] class ExecutorAllocationManager( |
| } |
| // Require external shuffle service for dynamic allocation |
| // Otherwise, we may lose shuffle files when killing executors |
| - if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) { |
| + if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing && !conf.isRssEnable()) { |
| throw new SparkException("Dynamic allocation of executors requires the external " + |
| "shuffle service. You may enable this through spark.shuffle.service.enabled.") |
| } |
| diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala |
| index 9b8be58c79..f9437a21c0 100644 |
| --- a/core/src/main/scala/org/apache/spark/SparkConf.scala |
| +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala |
| @@ -629,6 +629,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria |
| Utils.redact(this, getAll).sorted.map { case (k, v) => k + "=" + v }.mkString("\n") |
| } |
| |
| + /** |
| + * Return true if remote shuffle service is enabled. |
| + */ |
| + def isRssEnable(): Boolean = get("spark.shuffle.manager", "sort").contains("RssShuffleManager") |
| } |
| |
| private[spark] object SparkConf extends Logging { |
| diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala |
| index 9d6a8faa89..df27576b66 100644 |
| --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala |
| +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala |
| @@ -1742,7 +1742,7 @@ private[spark] class DAGScheduler( |
| // if the cluster manager explicitly tells us that the entire worker was lost, then |
| // we know to unregister shuffle output. (Note that "worker" specifically refers to the process |
| // from a Standalone cluster, where the shuffle service lives in the Worker.) |
| - val fileLost = workerLost || !env.blockManager.externalShuffleServiceEnabled |
| + val fileLost = (workerLost || !env.blockManager.externalShuffleServiceEnabled) && !sc.getConf.isRssEnable() |
| removeExecutorAndUnregisterOutputs( |
| execId = execId, |
| fileLost = fileLost, |
| diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala |
| index 459f575ba7..f563368fa5 100644 |
| --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala |
| +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala |
| @@ -975,7 +975,7 @@ private[spark] class TaskSetManager( |
| // The reason is the next stage wouldn't be able to fetch the data from this dead executor |
| // so we would need to rerun these tasks on other executors. |
| if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled |
| - && !isZombie) { |
| + && !isZombie && !conf.isRssEnable()) { |
| for ((tid, info) <- taskInfos if info.executorId == execId) { |
| val index = taskInfos(tid).index |
| // We may have a running task whose partition has been marked as successful, |
| diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala |
| index 021ce2eac0..50c52b0091 100644 |
| --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala |
| +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala |
| @@ -149,9 +149,13 @@ class ShuffledRowRDD( |
| } |
| |
| override def getPreferredLocations(partition: Partition): Seq[String] = { |
| - val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] |
| - val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]] |
| - tracker.getPreferredLocationsForShuffle(dep, partition.index) |
| + if (!conf.isRssEnable()) { |
| + val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] |
| + val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]] |
| + tracker.getPreferredLocationsForShuffle(dep, partition.index) |
| + } else { |
| + Nil |
| + } |
| } |
| |
| override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { |