blob: 0ea65123e16344f9765493dda89b4e5af84fc6e4 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index f06312c15cf..899af5df485 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -204,7 +204,9 @@ private[spark] class ExecutorAllocationManager(
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
}
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
- if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
+ if (conf.isRssEnable()) {
+ logInfo("Dynamic allocation will use remote shuffle service")
+ } else if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
logInfo("Dynamic allocation is enabled without a shuffle service.")
} else if (decommissionEnabled &&
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 08344d8e547..ff3bab6710d 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -580,6 +580,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
Utils.redact(this, getAll).sorted.map { case (k, v) => k + "=" + v }.mkString("\n")
}
+ /**
+ * Return true if remote shuffle service is enabled.
+ */
+ def isRssEnable(): Boolean = {
+ val shuffleMgr = get("spark.shuffle.manager", "sort")
+ shuffleMgr.contains("RssShuffleManager") || shuffleMgr.contains("UniffleShuffleManager")
+ }
}
private[spark] object SparkConf extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 2a966fab6f0..89cfdfe8082 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -2515,7 +2515,8 @@ private[spark] class DAGScheduler(
// if the cluster manager explicitly tells us that the entire worker was lost, then
// we know to unregister shuffle output. (Note that "worker" specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the Worker.)
- val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled
+ val fileLost = (workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled) &&
+ !sc.getConf.isRssEnable()
removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 124a27502fe..38e6c9bca21 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -1053,7 +1053,7 @@ private[spark] class TaskSetManager(
// could serve the shuffle outputs or the executor lost is caused by decommission (which
// can destroy the whole host). The reason is the next stage wouldn't be able to fetch the
// data from this dead executor so we would need to rerun these tasks on other executors.
- val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
+ val maybeShuffleMapOutputLoss = isShuffleMapTasks && !conf.isRssEnable() &&
(reason.isInstanceOf[ExecutorDecommission] || !env.blockManager.externalShuffleServiceEnabled)
if (maybeShuffleMapOutputLoss && !isZombie) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
index 367732dbb20..ad42e827271 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
@@ -166,6 +166,9 @@ class ShuffledRowRDD(
}
override def getPreferredLocations(partition: Partition): Seq[String] = {
+ if (conf.isRssEnable()) {
+ return Nil
+ }
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
partition.asInstanceOf[ShuffledRowRDDPartition].spec match {
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
--
2.39.3 (Apple Git-145)