[#1373][FOLLOWUP] fix(spark): shuffle manager rpc service invalid when partition data reassign is enabled (#1583)
### What changes were proposed in this pull request?
Fix task fail retry parameter not work and add task fail retry parameter in other locations
### Why are the changes needed?
Fix: (#1373)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Not necessary.
---------
Co-authored-by: shun01.ding <shun01.ding@vipshop.com>
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 5017442..4600bcc 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -128,6 +128,10 @@
JavaUtils.newConcurrentMap();
/** Whether to enable the dynamic shuffleServer function rewrite and reread functions */
private boolean rssResubmitStage;
+
+ private boolean taskBlockSendFailureRetry;
+
+ private boolean shuffleManagerRpcServiceEnabled;
/** A list of shuffleServer for Write failures */
private Set<String> failuresShuffleServerIds = Sets.newHashSet();
/**
@@ -222,11 +226,14 @@
this.rssResubmitStage =
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
+ this.taskBlockSendFailureRetry =
+ rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
+ this.shuffleManagerRpcServiceEnabled = taskBlockSendFailureRetry || rssResubmitStage;
if (!sparkConf.getBoolean(RssSparkConfig.RSS_TEST_FLAG.key(), false)) {
if (isDriver) {
heartBeatScheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
- if (rssResubmitStage) {
+ if (shuffleManagerRpcServiceEnabled) {
LOG.info("stage resubmit is supported and enabled");
// start shuffle manager server
rssConf.set(RPC_SERVER_PORT, 0);
@@ -376,7 +383,7 @@
shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
- if (rssResubmitStage) {
+ if (shuffleManagerRpcServiceEnabled) {
ShuffleHandleInfo handleInfo =
new ShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
shuffleIdToShuffleHandleInfo.put(shuffleId, handleInfo);
@@ -473,7 +480,7 @@
int shuffleId = rssHandle.getShuffleId();
String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber();
ShuffleHandleInfo shuffleHandleInfo;
- if (rssResubmitStage) {
+ if (shuffleManagerRpcServiceEnabled) {
// Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
} else {
@@ -543,7 +550,7 @@
+ "]");
start = System.currentTimeMillis();
ShuffleHandleInfo shuffleHandleInfo;
- if (rssResubmitStage) {
+ if (shuffleManagerRpcServiceEnabled) {
// Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
} else {
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index b360807..719aa80 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -137,6 +137,10 @@
private Map<Integer, ShuffleHandleInfo> shuffleIdToShuffleHandleInfo;
/** Whether to enable the dynamic shuffleServer function rewrite and reread functions */
private boolean rssResubmitStage;
+
+ private boolean taskBlockSendFailureRetryEnabled;
+
+ private boolean shuffleManagerRpcServiceEnabled;
/** A list of shuffleServer for Write failures */
private Set<String> failuresShuffleServerIds;
/**
@@ -239,10 +243,13 @@
this.rssResubmitStage =
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
+ this.taskBlockSendFailureRetryEnabled =
+ rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
+ this.shuffleManagerRpcServiceEnabled = taskBlockSendFailureRetryEnabled || rssResubmitStage;
if (isDriver) {
heartBeatScheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
- if (rssResubmitStage) {
+ if (shuffleManagerRpcServiceEnabled) {
LOG.info("stage resubmit is supported and enabled");
// start shuffle manager server
rssConf.set(RPC_SERVER_PORT, 0);
@@ -471,7 +478,7 @@
shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
- if (rssResubmitStage) {
+ if (shuffleManagerRpcServiceEnabled) {
ShuffleHandleInfo handleInfo =
new ShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
shuffleIdToShuffleHandleInfo.put(shuffleId, handleInfo);
@@ -509,7 +516,7 @@
writeMetrics = context.taskMetrics().shuffleWriteMetrics();
}
ShuffleHandleInfo shuffleHandleInfo;
- if (rssResubmitStage) {
+ if (shuffleManagerRpcServiceEnabled) {
// Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
} else {
@@ -651,7 +658,7 @@
final int partitionNum = rssShuffleHandle.getDependency().partitioner().numPartitions();
int shuffleId = rssShuffleHandle.getShuffleId();
ShuffleHandleInfo shuffleHandleInfo;
- if (rssResubmitStage) {
+ if (shuffleManagerRpcServiceEnabled) {
// Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
} else {
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 2fc0340..c95bc17 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -112,7 +112,7 @@
private final Set<Long> blockIds = Sets.newConcurrentHashSet();
private TaskContext taskContext;
private SparkConf sparkConf;
- private boolean taskFailRetry;
+ private boolean blockSendFailureRetryEnabled;
/** used by columnar rss shuffle writer implementation */
protected final long taskAttemptId;
@@ -189,10 +189,11 @@
this.taskFailureCallback = taskFailureCallback;
this.taskContext = context;
this.sparkConf = sparkConf;
- this.taskFailRetry =
+ this.blockSendFailureRetryEnabled =
sparkConf.getBoolean(
- RssClientConf.RSS_TASK_FAILED_RETRY_ENABLED.key(),
- RssClientConf.RSS_TASK_FAILED_RETRY_ENABLED.defaultValue());
+ RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+ + RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(),
+ RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.defaultValue());
}
public RssShuffleWriter(
@@ -420,7 +421,7 @@
private void checkIfBlocksFailed() {
Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
- if (taskFailRetry && !failedBlockIds.isEmpty()) {
+ if (blockSendFailureRetryEnabled && !failedBlockIds.isEmpty()) {
Set<TrackingBlockStatus> shouldResendBlockSet = shouldResendBlockStatusSet(failedBlockIds);
try {
reSendFailedBlockIds(shouldResendBlockSet);
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index be269ce..ac432e0 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -203,10 +203,10 @@
"This option is only valid when the remote storage path is specified. If ture, "
+ "the remote storage conf will use the client side hadoop configuration loaded from the classpath.");
- public static final ConfigOption<Boolean> RSS_TASK_FAILED_RETRY_ENABLED =
- ConfigOptions.key("rss.task.failed.retry.enabled")
+ public static final ConfigOption<Boolean> RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED =
+ ConfigOptions.key("rss.client.blockSendFailureRetry.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
- "Whether to support task write failed retry internal, default value is false.");
+ "Whether to support rss client block send failure retry, default value is false.");
}