| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala |
| index bdb768ed5a..ffde0643c5 100644 |
| --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala |
| +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala |
| @@ -208,7 +208,8 @@ private[spark] class ExecutorAllocationManager( |
| // decommissioning without a shuffle service. |
| if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) || |
| (decommissionEnabled && |
| - conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) { |
| + conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) || |
| + conf.isRssEnable()) { |
| logWarning("Dynamic allocation without a shuffle service is an experimental feature.") |
| } else if (!testing) { |
| throw new SparkException("Dynamic allocation of executors requires the external " + |
| diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala |
| index f81d218278..ff729da76a 100644 |
| --- a/core/src/main/scala/org/apache/spark/SparkConf.scala |
| +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala |
| @@ -598,6 +598,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria |
| Utils.redact(this, getAll).sorted.map { case (k, v) => k + "=" + v }.mkString("\n") |
| } |
| |
| + /** |
| + * Return true if remote shuffle service is enabled. |
| + */ |
| + def isRssEnable(): Boolean = get("spark.shuffle.manager", "sort").contains("RssShuffleManager") |
| } |
| |
| private[spark] object SparkConf extends Logging { |
| diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala |
| index ddabb659b2..75eb6bc52f 100644 |
| --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala |
| +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala |
| @@ -2039,7 +2039,8 @@ private[spark] class DAGScheduler( |
| // if the cluster manager explicitly tells us that the entire worker was lost, then |
| // we know to unregister shuffle output. (Note that "worker" specifically refers to the process |
| // from a Standalone cluster, where the shuffle service lives in the Worker.) |
| - val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled |
| + val fileLost = (workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled) && |
| + !sc.getConf.isRssEnable() |
| removeExecutorAndUnregisterOutputs( |
| execId = execId, |
| fileLost = fileLost, |
| diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala |
| index ad0791fa42..2d2ad52c71 100644 |
| --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala |
| +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala |
| @@ -973,7 +973,8 @@ private[spark] class TaskSetManager( |
| // and we are not using an external shuffle server which could serve the shuffle outputs. |
| // The reason is the next stage wouldn't be able to fetch the data from this dead executor |
| // so we would need to rerun these tasks on other executors. |
| - if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) { |
| + if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && |
| + !isZombie && !conf.isRssEnable()) { |
| for ((tid, info) <- taskInfos if info.executorId == execId) { |
| val index = taskInfos(tid).index |
| // We may have a running task whose partition has been marked as successful, |
| diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala |
| index ef84cd27a3..0b1ab0d50b 100644 |
| --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala |
| +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala |
| @@ -159,19 +159,23 @@ class ShuffledRowRDD( |
| } |
| |
| override def getPreferredLocations(partition: Partition): Seq[String] = { |
| - val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] |
| - partition.asInstanceOf[ShuffledRowRDDPartition].spec match { |
| - case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => |
| - // TODO order by partition size. |
| - startReducerIndex.until(endReducerIndex).flatMap { reducerIndex => |
| - tracker.getPreferredLocationsForShuffle(dependency, reducerIndex) |
| - } |
| - |
| - case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex, _) => |
| - tracker.getMapLocation(dependency, startMapIndex, endMapIndex) |
| - |
| - case PartialMapperPartitionSpec(mapIndex, _, _) => |
| - tracker.getMapLocation(dependency, mapIndex, mapIndex + 1) |
| + if (!conf.isRssEnable()) { |
| + val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] |
| + partition.asInstanceOf[ShuffledRowRDDPartition].spec match { |
| + case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => |
| + // TODO order by partition size. |
| + startReducerIndex.until(endReducerIndex).flatMap { reducerIndex => |
| + tracker.getPreferredLocationsForShuffle(dependency, reducerIndex) |
| + } |
| + |
| + case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex, _) => |
| + tracker.getMapLocation(dependency, startMapIndex, endMapIndex) |
| + |
| + case PartialMapperPartitionSpec(mapIndex, _, _) => |
| + tracker.getMapLocation(dependency, mapIndex, mapIndex + 1) |
| + } |
| + } else { |
| + Nil |
| } |
| } |
| |