[#1373][FOLLOWUP] fix(spark): shuffle manager rpc service invalid when partition data reassign is enabled (#1583)

### What changes were proposed in this pull request?
Fix task fail retry parameter not work and add task fail retry parameter in other locations

### Why are the changes needed?
Fix: (#1373)

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Not necessary.

---------

Co-authored-by: shun01.ding <shun01.ding@vipshop.com>
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 5017442..4600bcc 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -128,6 +128,10 @@
       JavaUtils.newConcurrentMap();
   /** Whether to enable the dynamic shuffleServer function rewrite and reread functions */
   private boolean rssResubmitStage;
+
+  private boolean taskBlockSendFailureRetry;
+
+  private boolean shuffleManagerRpcServiceEnabled;
   /** A list of shuffleServer for Write failures */
   private Set<String> failuresShuffleServerIds = Sets.newHashSet();
   /**
@@ -222,11 +226,14 @@
     this.rssResubmitStage =
         rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
             && RssSparkShuffleUtils.isStageResubmitSupported();
+    this.taskBlockSendFailureRetry =
+        rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
+    this.shuffleManagerRpcServiceEnabled = taskBlockSendFailureRetry || rssResubmitStage;
     if (!sparkConf.getBoolean(RssSparkConfig.RSS_TEST_FLAG.key(), false)) {
       if (isDriver) {
         heartBeatScheduledExecutorService =
             ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
-        if (rssResubmitStage) {
+        if (shuffleManagerRpcServiceEnabled) {
           LOG.info("stage resubmit is supported and enabled");
           // start shuffle manager server
           rssConf.set(RPC_SERVER_PORT, 0);
@@ -376,7 +383,7 @@
 
     shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
     shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
-    if (rssResubmitStage) {
+    if (shuffleManagerRpcServiceEnabled) {
       ShuffleHandleInfo handleInfo =
           new ShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
       shuffleIdToShuffleHandleInfo.put(shuffleId, handleInfo);
@@ -473,7 +480,7 @@
       int shuffleId = rssHandle.getShuffleId();
       String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber();
       ShuffleHandleInfo shuffleHandleInfo;
-      if (rssResubmitStage) {
+      if (shuffleManagerRpcServiceEnabled) {
         // Get the ShuffleServer list from the Driver based on the shuffleId
         shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
       } else {
@@ -543,7 +550,7 @@
               + "]");
       start = System.currentTimeMillis();
       ShuffleHandleInfo shuffleHandleInfo;
-      if (rssResubmitStage) {
+      if (shuffleManagerRpcServiceEnabled) {
         // Get the ShuffleServer list from the Driver based on the shuffleId
         shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
       } else {
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index b360807..719aa80 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -137,6 +137,10 @@
   private Map<Integer, ShuffleHandleInfo> shuffleIdToShuffleHandleInfo;
   /** Whether to enable the dynamic shuffleServer function rewrite and reread functions */
   private boolean rssResubmitStage;
+
+  private boolean taskBlockSendFailureRetryEnabled;
+
+  private boolean shuffleManagerRpcServiceEnabled;
   /** A list of shuffleServer for Write failures */
   private Set<String> failuresShuffleServerIds;
   /**
@@ -239,10 +243,13 @@
     this.rssResubmitStage =
         rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
             && RssSparkShuffleUtils.isStageResubmitSupported();
+    this.taskBlockSendFailureRetryEnabled =
+        rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
+    this.shuffleManagerRpcServiceEnabled = taskBlockSendFailureRetryEnabled || rssResubmitStage;
     if (isDriver) {
       heartBeatScheduledExecutorService =
           ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
-      if (rssResubmitStage) {
+      if (shuffleManagerRpcServiceEnabled) {
         LOG.info("stage resubmit is supported and enabled");
         // start shuffle manager server
         rssConf.set(RPC_SERVER_PORT, 0);
@@ -471,7 +478,7 @@
 
     shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
     shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
-    if (rssResubmitStage) {
+    if (shuffleManagerRpcServiceEnabled) {
       ShuffleHandleInfo handleInfo =
           new ShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
       shuffleIdToShuffleHandleInfo.put(shuffleId, handleInfo);
@@ -509,7 +516,7 @@
       writeMetrics = context.taskMetrics().shuffleWriteMetrics();
     }
     ShuffleHandleInfo shuffleHandleInfo;
-    if (rssResubmitStage) {
+    if (shuffleManagerRpcServiceEnabled) {
       // Get the ShuffleServer list from the Driver based on the shuffleId
       shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
     } else {
@@ -651,7 +658,7 @@
     final int partitionNum = rssShuffleHandle.getDependency().partitioner().numPartitions();
     int shuffleId = rssShuffleHandle.getShuffleId();
     ShuffleHandleInfo shuffleHandleInfo;
-    if (rssResubmitStage) {
+    if (shuffleManagerRpcServiceEnabled) {
       // Get the ShuffleServer list from the Driver based on the shuffleId
       shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
     } else {
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 2fc0340..c95bc17 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -112,7 +112,7 @@
   private final Set<Long> blockIds = Sets.newConcurrentHashSet();
   private TaskContext taskContext;
   private SparkConf sparkConf;
-  private boolean taskFailRetry;
+  private boolean blockSendFailureRetryEnabled;
 
   /** used by columnar rss shuffle writer implementation */
   protected final long taskAttemptId;
@@ -189,10 +189,11 @@
     this.taskFailureCallback = taskFailureCallback;
     this.taskContext = context;
     this.sparkConf = sparkConf;
-    this.taskFailRetry =
+    this.blockSendFailureRetryEnabled =
         sparkConf.getBoolean(
-            RssClientConf.RSS_TASK_FAILED_RETRY_ENABLED.key(),
-            RssClientConf.RSS_TASK_FAILED_RETRY_ENABLED.defaultValue());
+            RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+                + RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(),
+            RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.defaultValue());
   }
 
   public RssShuffleWriter(
@@ -420,7 +421,7 @@
 
   private void checkIfBlocksFailed() {
     Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
-    if (taskFailRetry && !failedBlockIds.isEmpty()) {
+    if (blockSendFailureRetryEnabled && !failedBlockIds.isEmpty()) {
       Set<TrackingBlockStatus> shouldResendBlockSet = shouldResendBlockStatusSet(failedBlockIds);
       try {
         reSendFailedBlockIds(shouldResendBlockSet);
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index be269ce..ac432e0 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -203,10 +203,10 @@
               "This option is only valid when the remote storage path is specified. If ture, "
                   + "the remote storage conf will use the client side hadoop configuration loaded from the classpath.");
 
-  public static final ConfigOption<Boolean> RSS_TASK_FAILED_RETRY_ENABLED =
-      ConfigOptions.key("rss.task.failed.retry.enabled")
+  public static final ConfigOption<Boolean> RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED =
+      ConfigOptions.key("rss.client.blockSendFailureRetry.enabled")
           .booleanType()
           .defaultValue(false)
           .withDescription(
-              "Whether to support task write failed retry internal, default value is false.");
+              "Whether to support rss client block send failure retry, default value is false.");
 }