[#731][FOLLOWUP] feat(Spark): Configure blockIdLayout for Spark based on max partitions (#1566)

### What changes were proposed in this pull request?
The configuration of the block id layout for Spark2 and Spark3 can be simplified by only providing the maximal number of partitions. Increments `SHUFFLE_SERVER_VERSION` as the "new" Spark client requires to connect to a "new" shuffle server.

### Why are the changes needed?
Currently, optimally configuring the block id layout for Spark is quite complex: https://github.com/apache/incubator-uniffle/pull/1528/files#diff-09ce7eaa98815d62ca00b2a8b0a45b0a922b047014c1f91dc17081b3fef8e7a8R106-R112

Three values have to be provided: the number of bits for sequence number, partition id and task attempt id. The task attempt id has to provide more bits than the partition id, in the sense that the maximal number of attempts can be stored. This requires to also account for speculative execution.

The RssShuffleManager can do the computation and derive the optimal block id layout configuration from the maximal number of partitions only.

### Does this PR introduce _any_ user-facing change?
Adds optional configuration `spark.rss.blockId.maxPartitions`.

### How was this patch tested?
Unit and integration tests.
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index f0f2581..ee1278c 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -373,6 +373,15 @@
                   .doc("Whether to enable the resubmit stage."))
           .createWithDefault(false);
 
+  public static final ConfigEntry<Integer> RSS_MAX_PARTITIONS =
+      createIntegerBuilder(
+              new ConfigBuilder("spark.rss.blockId.maxPartitions")
+                  .doc(
+                      "Sets the maximum number of partitions to be supported by block ids. "
+                          + "This determines the bits reserved in block ids for the "
+                          + "sequence number, the partition id and the task attempt id."))
+          .createWithDefault(1048576);
+
   // spark2 doesn't have this key defined
   public static final String SPARK_SHUFFLE_COMPRESS_KEY = "spark.shuffle.compress";
 
diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 14c99b8..c75207b 100644
--- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -19,10 +19,14 @@
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.MapOutputTracker;
@@ -38,6 +42,8 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.config.ConfigOption;
+import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 
@@ -51,6 +57,213 @@
   private Method registerShuffleMethod;
 
   /** See static overload of this method. */
+  public abstract void configureBlockIdLayout(SparkConf sparkConf, RssConf rssConf);
+
+  /**
+   * Derives block id layout config from maximum number of allowed partitions. This value can be set
+   * in either SparkConf or RssConf via RssSparkConfig.RSS_MAX_PARTITIONS, where SparkConf has
+   * precedence.
+   *
+   * <p>Computes the number of required bits for partition id and task attempt id and reserves
+   * remaining bits for sequence number. Adds RssClientConf.BLOCKID_SEQUENCE_NO_BITS,
+   * RssClientConf.BLOCKID_PARTITION_ID_BITS, and RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS to the
+   * given RssConf and adds them prefixed with "spark." to the given SparkConf.
+   *
+   * <p>If RssSparkConfig.RSS_MAX_PARTITIONS is not set, given values for
+   * RssClientConf.BLOCKID_SEQUENCE_NO_BITS, RssClientConf.BLOCKID_PARTITION_ID_BITS, and
+   * RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS are copied
+   *
+   * <p>Then, BlockIdLayout can consistently be created from both configs:
+   *
+   * <p>BlockIdLayout.from(rssConf) BlockIdLayout.from(RssSparkConfig.toRssConf(sparkConf))
+   *
+   * @param sparkConf Spark config providing max partitions
+   * @param rssConf Rss config to amend
+   * @param maxFailures Spark max failures
+   * @param speculation Spark speculative execution
+   */
+  @VisibleForTesting
+  protected static void configureBlockIdLayout(
+      SparkConf sparkConf, RssConf rssConf, int maxFailures, boolean speculation) {
+    if (sparkConf.contains(RssSparkConfig.RSS_MAX_PARTITIONS.key())) {
+      configureBlockIdLayoutFromMaxPartitions(sparkConf, rssConf, maxFailures, speculation);
+    } else {
+      configureBlockIdLayoutFromLayoutConfig(sparkConf, rssConf, maxFailures, speculation);
+    }
+  }
+
+  private static void configureBlockIdLayoutFromMaxPartitions(
+      SparkConf sparkConf, RssConf rssConf, int maxFailures, boolean speculation) {
+    int maxPartitions =
+        sparkConf.getInt(
+            RssSparkConfig.RSS_MAX_PARTITIONS.key(),
+            RssSparkConfig.RSS_MAX_PARTITIONS.defaultValue().get());
+    if (maxPartitions <= 1) {
+      throw new IllegalArgumentException(
+          "Value of "
+              + RssSparkConfig.RSS_MAX_PARTITIONS.key()
+              + " must be larger than 1: "
+              + maxPartitions);
+    }
+
+    int attemptIdBits = getAttemptIdBits(getMaxAttemptNo(maxFailures, speculation));
+    int partitionIdBits = 32 - Integer.numberOfLeadingZeros(maxPartitions - 1); // [1..31]
+    int taskAttemptIdBits = partitionIdBits + attemptIdBits; // [1+attemptIdBits..31+attemptIdBits]
+    int sequenceNoBits = 63 - partitionIdBits - taskAttemptIdBits; // [1-attemptIdBits..61]
+
+    if (taskAttemptIdBits > 31) {
+      throw new IllegalArgumentException(
+          "Cannot support "
+              + RssSparkConfig.RSS_MAX_PARTITIONS.key()
+              + "="
+              + maxPartitions
+              + " partitions, "
+              + "as this would require to reserve more than 31 bits "
+              + "in the block id for task attempt ids. "
+              + "With spark.maxFailures="
+              + maxFailures
+              + " and spark.speculation="
+              + (speculation ? "true" : "false")
+              + " at most "
+              + (1 << (31 - attemptIdBits))
+              + " partitions can be supported.");
+    }
+
+    // we have to cap the sequence number bits at 31 bits,
+    // because BlockIdLayout imposes an upper bound of 31 bits
+    // which is fine as this allows for over 2bn sequence ids
+    if (sequenceNoBits > 31) {
+      // move spare bits (bits over 31) from sequence number to partition id and task attempt id
+      int spareBits = sequenceNoBits - 31;
+
+      // make spareBits even, so we add same number of bits to partitionIdBits and taskAttemptIdBits
+      spareBits += spareBits % 2;
+
+      // move spare bits over
+      partitionIdBits += spareBits / 2;
+      taskAttemptIdBits += spareBits / 2;
+      maxPartitions = (1 << partitionIdBits);
+
+      // log with original sequenceNoBits
+      if (LOG.isInfoEnabled()) {
+        LOG.info(
+            "Increasing "
+                + RssSparkConfig.RSS_MAX_PARTITIONS.key()
+                + " to "
+                + maxPartitions
+                + ", "
+                + "otherwise we would have to support 2^"
+                + sequenceNoBits
+                + " (more than 2^31) sequence numbers.");
+      }
+
+      // remove spare bits
+      sequenceNoBits -= spareBits;
+
+      // propagate the change value back to SparkConf
+      sparkConf.set(RssSparkConfig.RSS_MAX_PARTITIONS.key(), String.valueOf(maxPartitions));
+    }
+
+    // set block id layout config in RssConf
+    rssConf.set(RssClientConf.BLOCKID_SEQUENCE_NO_BITS, sequenceNoBits);
+    rssConf.set(RssClientConf.BLOCKID_PARTITION_ID_BITS, partitionIdBits);
+    rssConf.set(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS, taskAttemptIdBits);
+
+    // materialize these RssConf settings in sparkConf as well
+    // so that RssSparkConfig.toRssConf(sparkConf) provides this configuration
+    sparkConf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key(),
+        String.valueOf(sequenceNoBits));
+    sparkConf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_PARTITION_ID_BITS.key(),
+        String.valueOf(partitionIdBits));
+    sparkConf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key(),
+        String.valueOf(taskAttemptIdBits));
+  }
+
+  private static void configureBlockIdLayoutFromLayoutConfig(
+      SparkConf sparkConf, RssConf rssConf, int maxFailures, boolean speculation) {
+    String sparkPrefix = RssSparkConfig.SPARK_RSS_CONFIG_PREFIX;
+    String sparkSeqNoBitsKey = sparkPrefix + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key();
+    String sparkPartIdBitsKey = sparkPrefix + RssClientConf.BLOCKID_PARTITION_ID_BITS.key();
+    String sparkTaskIdBitsKey = sparkPrefix + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key();
+
+    // if one bit field is configured, all three must be given
+    List<String> sparkKeys =
+        Arrays.asList(sparkSeqNoBitsKey, sparkPartIdBitsKey, sparkTaskIdBitsKey);
+    if (sparkKeys.stream().anyMatch(sparkConf::contains)
+        && !sparkKeys.stream().allMatch(sparkConf::contains)) {
+      String allKeys = sparkKeys.stream().collect(Collectors.joining(", "));
+      String existingKeys =
+          Arrays.stream(sparkConf.getAll())
+              .map(t -> t._1)
+              .filter(sparkKeys.stream().collect(Collectors.toSet())::contains)
+              .collect(Collectors.joining(", "));
+      throw new IllegalArgumentException(
+          "All block id bit config keys must be provided ("
+              + allKeys
+              + "), not just a sub-set: "
+              + existingKeys);
+    }
+
+    // if one bit field is configured, all three must be given
+    List<ConfigOption<Integer>> rssKeys =
+        Arrays.asList(
+            RssClientConf.BLOCKID_SEQUENCE_NO_BITS,
+            RssClientConf.BLOCKID_PARTITION_ID_BITS,
+            RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS);
+    if (rssKeys.stream().anyMatch(rssConf::contains)
+        && !rssKeys.stream().allMatch(rssConf::contains)) {
+      String allKeys = rssKeys.stream().map(ConfigOption::key).collect(Collectors.joining(", "));
+      String existingKeys =
+          rssConf.getKeySet().stream()
+              .filter(rssKeys.stream().map(ConfigOption::key).collect(Collectors.toSet())::contains)
+              .collect(Collectors.joining(", "));
+      throw new IllegalArgumentException(
+          "All block id bit config keys must be provided ("
+              + allKeys
+              + "), not just a sub-set: "
+              + existingKeys);
+    }
+
+    if (sparkKeys.stream().allMatch(sparkConf::contains)) {
+      rssConf.set(RssClientConf.BLOCKID_SEQUENCE_NO_BITS, sparkConf.getInt(sparkSeqNoBitsKey, 0));
+      rssConf.set(RssClientConf.BLOCKID_PARTITION_ID_BITS, sparkConf.getInt(sparkPartIdBitsKey, 0));
+      rssConf.set(
+          RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS, sparkConf.getInt(sparkTaskIdBitsKey, 0));
+    } else if (rssKeys.stream().allMatch(rssConf::contains)) {
+      sparkConf.set(sparkSeqNoBitsKey, rssConf.getValue(RssClientConf.BLOCKID_SEQUENCE_NO_BITS));
+      sparkConf.set(sparkPartIdBitsKey, rssConf.getValue(RssClientConf.BLOCKID_PARTITION_ID_BITS));
+      sparkConf.set(
+          sparkTaskIdBitsKey, rssConf.getValue(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS));
+    } else {
+      // use default max partitions
+      sparkConf.set(
+          RssSparkConfig.RSS_MAX_PARTITIONS.key(),
+          RssSparkConfig.RSS_MAX_PARTITIONS.defaultValueString());
+      configureBlockIdLayoutFromMaxPartitions(sparkConf, rssConf, maxFailures, speculation);
+    }
+  }
+
+  protected static int getMaxAttemptNo(int maxFailures, boolean speculation) {
+    // attempt number is zero based: 0, 1, …, maxFailures-1
+    // max maxFailures < 1 is not allowed but for safety, we interpret that as maxFailures == 1
+    int maxAttemptNo = maxFailures < 1 ? 0 : maxFailures - 1;
+
+    // with speculative execution enabled we could observe +1 attempts
+    if (speculation) {
+      maxAttemptNo++;
+    }
+
+    return maxAttemptNo;
+  }
+
+  protected static int getAttemptIdBits(int maxAttemptNo) {
+    return 32 - Integer.numberOfLeadingZeros(maxAttemptNo);
+  }
+
+  /** See static overload of this method. */
   public abstract long getTaskAttemptIdForBlockId(int mapIndex, int attemptNo);
 
   /**
@@ -68,14 +281,8 @@
    */
   protected static long getTaskAttemptIdForBlockId(
       int mapIndex, int attemptNo, int maxFailures, boolean speculation, int maxTaskAttemptIdBits) {
-    // attempt number is zero based: 0, 1, …, maxFailures-1
-    // max maxFailures < 1 is not allowed but for safety, we interpret that as maxFailures == 1
-    int maxAttemptNo = maxFailures < 1 ? 0 : maxFailures - 1;
-
-    // with speculative execution enabled we could observe +1 attempts
-    if (speculation) {
-      maxAttemptNo++;
-    }
+    int maxAttemptNo = getMaxAttemptNo(maxFailures, speculation);
+    int attemptBits = getAttemptIdBits(maxAttemptNo);
 
     if (attemptNo > maxAttemptNo) {
       // this should never happen, if it does, our assumptions are wrong,
@@ -89,7 +296,6 @@
               + ".");
     }
 
-    int attemptBits = 32 - Integer.numberOfLeadingZeros(maxAttemptNo);
     int mapIndexBits = 32 - Integer.numberOfLeadingZeros(mapIndex);
     if (mapIndexBits + attemptBits > maxTaskAttemptIdBits) {
       throw new RssException(
diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
index 9440a69..440c8fb 100644
--- a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
+++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
@@ -18,15 +18,24 @@
 package org.apache.uniffle.shuffle.manager;
 
 import java.util.Arrays;
+import java.util.stream.Stream;
 
 import org.apache.spark.SparkConf;
+import org.apache.spark.shuffle.RssSparkConfig;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 
 import static org.apache.uniffle.shuffle.manager.RssShuffleManagerBase.getTaskAttemptIdForBlockId;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -45,6 +54,341 @@
     assertEquals(remoteStorageInfo.getConfItems().get("fs.defaultFs"), "hdfs://rbf-xxx/foo");
   }
 
+  private static Stream<Arguments> testConfigureBlockIdLayoutSource() {
+    // test arguments are
+    // - maxPartitions
+    // - maxFailure
+    // - speculation
+    // - expected maxPartitions
+    // - expected sequence number bits
+    // - expected partition id bits
+    // - expected task attempt id bits
+    return Stream.of(
+        // default config values
+        Arguments.of(null, 4, false, "1048576", 21, 20, 22),
+
+        // without speculation
+        Arguments.of("2", 0, false, "65536", 31, 16, 16),
+        Arguments.of("2147483647", 0, false, "2147483647", 1, 31, 31),
+        Arguments.of("1024", 3, false, "32768", 31, 15, 17),
+        Arguments.of("131072", 3, false, "131072", 27, 17, 19),
+        Arguments.of("1048576", 3, false, "1048576", 21, 20, 22),
+        Arguments.of("1048577", 3, false, "1048577", 19, 21, 23),
+        Arguments.of("1024", 4, false, "32768", 31, 15, 17),
+        Arguments.of("131072", 4, false, "131072", 27, 17, 19),
+        Arguments.of("1048576", 4, false, "1048576", 21, 20, 22),
+        Arguments.of("1048577", 4, false, "1048577", 19, 21, 23),
+        Arguments.of("1024", 5, false, "32768", 30, 15, 18),
+        Arguments.of("131072", 5, false, "131072", 26, 17, 20),
+        Arguments.of("1048576", 5, false, "1048576", 20, 20, 23),
+        Arguments.of("1048577", 5, false, "1048577", 18, 21, 24),
+        Arguments.of("2", 1073741824, false, "2", 31, 1, 31),
+
+        // with speculation
+        Arguments.of("2", 0, true, "65536", 30, 16, 17),
+        Arguments.of("1073741824", 0, true, "1073741824", 2, 30, 31),
+        Arguments.of("1024", 3, true, "32768", 31, 15, 17),
+        Arguments.of("131072", 3, true, "131072", 27, 17, 19),
+        Arguments.of("1048576", 3, true, "1048576", 21, 20, 22),
+        Arguments.of("1048577", 3, true, "1048577", 19, 21, 23),
+        Arguments.of("1024", 4, true, "32768", 30, 15, 18),
+        Arguments.of("131072", 4, true, "131072", 26, 17, 20),
+        Arguments.of("1048576", 4, true, "1048576", 20, 20, 23),
+        Arguments.of("1048577", 4, true, "1048577", 18, 21, 24),
+        Arguments.of("1024", 5, true, "32768", 30, 15, 18),
+        Arguments.of("131072", 5, true, "131072", 26, 17, 20),
+        Arguments.of("1048576", 5, true, "1048576", 20, 20, 23),
+        Arguments.of("1048577", 5, true, "1048577", 18, 21, 24),
+        Arguments.of("2", 1073741823, true, "2", 31, 1, 31));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testConfigureBlockIdLayoutSource")
+  public void testConfigureBlockIdLayout(
+      String setMaxPartitions,
+      Integer setMaxFailure,
+      Boolean setSpeculation,
+      String expectedMaxPartitions,
+      int expectedSequenceNoBits,
+      int expectedPartitionIdBits,
+      int expectedTaskAttemptIdBits) {
+    SparkConf sparkConf = new SparkConf();
+    if (setMaxPartitions != null) {
+      sparkConf.set(RssSparkConfig.RSS_MAX_PARTITIONS.key(), setMaxPartitions);
+    }
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+
+    RssShuffleManagerBase.configureBlockIdLayout(sparkConf, rssConf, setMaxFailure, setSpeculation);
+
+    if (expectedMaxPartitions == null) {
+      assertFalse(sparkConf.contains(RssSparkConfig.RSS_MAX_PARTITIONS.key()));
+    } else {
+      assertTrue(sparkConf.contains(RssSparkConfig.RSS_MAX_PARTITIONS.key()));
+      assertEquals(expectedMaxPartitions, sparkConf.get(RssSparkConfig.RSS_MAX_PARTITIONS.key()));
+    }
+
+    String key;
+    key = RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key();
+    assertTrue(sparkConf.contains(key));
+    assertEquals(String.valueOf(expectedSequenceNoBits), sparkConf.get(key));
+    assertEquals(expectedSequenceNoBits, rssConf.get(RssClientConf.BLOCKID_SEQUENCE_NO_BITS));
+
+    key = RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_PARTITION_ID_BITS.key();
+    assertTrue(sparkConf.contains(key));
+    assertEquals(String.valueOf(expectedPartitionIdBits), sparkConf.get(key));
+    assertEquals(expectedPartitionIdBits, rssConf.get(RssClientConf.BLOCKID_PARTITION_ID_BITS));
+
+    key = RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key();
+    assertTrue(sparkConf.contains(key));
+    assertEquals(String.valueOf(expectedTaskAttemptIdBits), sparkConf.get(key));
+    assertEquals(
+        expectedTaskAttemptIdBits, rssConf.get(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS));
+  }
+
+  @Test
+  public void testConfigureBlockIdLayoutOverrides() {
+    SparkConf sparkConf = new SparkConf();
+    RssConf rssConf = new RssConf();
+    int maxFailures = 4;
+    boolean speculation = false;
+
+    String sparkPrefix = RssSparkConfig.SPARK_RSS_CONFIG_PREFIX;
+    @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+    String sparkSeqNoBitsKey = sparkPrefix + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key();
+    @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+    String sparkPartIdBitsKey = sparkPrefix + RssClientConf.BLOCKID_PARTITION_ID_BITS.key();
+    @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+    String sparkTaskIdBitsKey = sparkPrefix + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key();
+
+    // SparkConf populates RssConf
+    sparkConf.set(RssSparkConfig.RSS_MAX_PARTITIONS.key(), "131072");
+    RssShuffleManagerBase.configureBlockIdLayout(sparkConf, rssConf, maxFailures, speculation);
+    assertEquals(27, rssConf.get(RssClientConf.BLOCKID_SEQUENCE_NO_BITS));
+    assertEquals(17, rssConf.get(RssClientConf.BLOCKID_PARTITION_ID_BITS));
+    assertEquals(19, rssConf.get(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS));
+    assertEquals(131072, sparkConf.getInt(RssSparkConfig.RSS_MAX_PARTITIONS.key(), -1));
+    assertEquals(27, sparkConf.getInt(sparkSeqNoBitsKey, -1));
+    assertEquals(17, sparkConf.getInt(sparkPartIdBitsKey, -1));
+    assertEquals(19, sparkConf.getInt(sparkTaskIdBitsKey, -1));
+
+    // SparkConf overrides RssConf
+    sparkConf.set(RssSparkConfig.RSS_MAX_PARTITIONS.key(), "131073");
+    RssShuffleManagerBase.configureBlockIdLayout(sparkConf, rssConf, maxFailures, speculation);
+    assertEquals(25, rssConf.get(RssClientConf.BLOCKID_SEQUENCE_NO_BITS));
+    assertEquals(18, rssConf.get(RssClientConf.BLOCKID_PARTITION_ID_BITS));
+    assertEquals(20, rssConf.get(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS));
+    assertEquals(131073, sparkConf.getInt(RssSparkConfig.RSS_MAX_PARTITIONS.key(), -1));
+    assertEquals(25, sparkConf.getInt(sparkSeqNoBitsKey, -1));
+    assertEquals(18, sparkConf.getInt(sparkPartIdBitsKey, -1));
+    assertEquals(20, sparkConf.getInt(sparkTaskIdBitsKey, -1));
+
+    // SparkConf block id config overrides RssConf
+    sparkConf.remove(RssSparkConfig.RSS_MAX_PARTITIONS.key());
+    sparkConf.set(sparkSeqNoBitsKey, "22");
+    sparkConf.set(sparkPartIdBitsKey, "21");
+    sparkConf.set(sparkTaskIdBitsKey, "20");
+    RssShuffleManagerBase.configureBlockIdLayout(sparkConf, rssConf, maxFailures, speculation);
+    assertEquals(22, rssConf.get(RssClientConf.BLOCKID_SEQUENCE_NO_BITS));
+    assertEquals(21, rssConf.get(RssClientConf.BLOCKID_PARTITION_ID_BITS));
+    assertEquals(20, rssConf.get(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS));
+    assertFalse(sparkConf.contains(RssSparkConfig.RSS_MAX_PARTITIONS.key()));
+    assertEquals(22, sparkConf.getInt(sparkSeqNoBitsKey, -1));
+    assertEquals(21, sparkConf.getInt(sparkPartIdBitsKey, -1));
+    assertEquals(20, sparkConf.getInt(sparkTaskIdBitsKey, -1));
+
+    // empty SparkConf preserves RssConf
+    sparkConf = new SparkConf();
+    RssShuffleManagerBase.configureBlockIdLayout(sparkConf, rssConf, maxFailures, speculation);
+    assertEquals(22, rssConf.get(RssClientConf.BLOCKID_SEQUENCE_NO_BITS));
+    assertEquals(21, rssConf.get(RssClientConf.BLOCKID_PARTITION_ID_BITS));
+    assertEquals(20, rssConf.get(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS));
+    assertFalse(sparkConf.contains(RssSparkConfig.RSS_MAX_PARTITIONS.key()));
+    assertEquals(22, sparkConf.getInt(sparkSeqNoBitsKey, -1));
+    assertEquals(21, sparkConf.getInt(sparkPartIdBitsKey, -1));
+    assertEquals(20, sparkConf.getInt(sparkTaskIdBitsKey, -1));
+  }
+
+  private static Stream<Arguments> testConfigureBlockIdLayoutMaxPartitionsValueExceptionSource() {
+    // test arguments are
+    // - maxPartitions
+    // - maxFailure
+    // - speculation
+    return Stream.of(
+        // without speculation
+        Arguments.of("-1", 4, false),
+        Arguments.of("0", 4, false),
+
+        // with speculation
+        Arguments.of("-1", 4, true),
+        Arguments.of("0", 4, true),
+        Arguments.of("1", 4, true));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testConfigureBlockIdLayoutMaxPartitionsValueExceptionSource")
+  public void testConfigureBlockIdLayoutMaxPartitionsValueException(
+      String setMaxPartitions, int setMaxFailure, boolean setSpeculation) {
+    SparkConf sparkConf = new SparkConf();
+    if (setMaxPartitions != null) {
+      sparkConf.set(RssSparkConfig.RSS_MAX_PARTITIONS.key(), setMaxPartitions);
+    }
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+
+    Executable call =
+        () ->
+            RssShuffleManagerBase.configureBlockIdLayout(
+                sparkConf, rssConf, setMaxFailure, setSpeculation);
+    Exception e = assertThrowsExactly(IllegalArgumentException.class, call);
+
+    String expectedMessage =
+        "Value of spark.rss.blockId.maxPartitions must be larger than 1: " + setMaxPartitions;
+    assertEquals(expectedMessage, e.getMessage());
+  }
+
+  private static Stream<Arguments> testConfigureBlockIdLayoutUnsupportedMaxPartitionsSource() {
+    // test arguments are
+    // - maxPartitions
+    // - maxFailure
+    // - speculation
+    // - expected message
+    return Stream.of(
+        // without speculation
+        Arguments.of("2097152", 2048, false, "1048576"),
+        Arguments.of("536870913", 3, false, "536870912"),
+        Arguments.of("1073741825", 2, false, "1073741824"),
+
+        // with speculation
+        Arguments.of("2097152", 2048, true, "524288"),
+        Arguments.of("536870913", 3, true, "536870912"),
+        Arguments.of("1073741824", 2, true, "536870912"));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testConfigureBlockIdLayoutUnsupportedMaxPartitionsSource")
+  public void testConfigureBlockIdLayoutUnsupportedMaxPartitions(
+      String setMaxPartitions, int setMaxFailure, boolean setSpeculation, String atMost) {
+    SparkConf sparkConf = new SparkConf();
+    if (setMaxPartitions != null) {
+      sparkConf.set(RssSparkConfig.RSS_MAX_PARTITIONS.key(), setMaxPartitions);
+    }
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+
+    String expectedMessage =
+        "Cannot support spark.rss.blockId.maxPartitions="
+            + setMaxPartitions
+            + " partitions, as this would require to reserve more than 31 bits in the block id for task attempt ids. With spark.maxFailures="
+            + setMaxFailure
+            + " and spark.speculation="
+            + setSpeculation
+            + " at most "
+            + atMost
+            + " partitions can be supported.";
+    Executable call =
+        () ->
+            RssShuffleManagerBase.configureBlockIdLayout(
+                sparkConf, rssConf, setMaxFailure, setSpeculation);
+    Exception e = assertThrowsExactly(IllegalArgumentException.class, call);
+    assertEquals(expectedMessage, e.getMessage());
+  }
+
+  private static Stream<Arguments> testConfigureBlockIdLayoutInsufficientConfigExceptionSource() {
+    // test arguments are
+    // - sequenceNoBits
+    // - partitionIdBits
+    // - taskAttemptIdBits
+    // - config
+    return Stream.of("spark", "rss")
+        .flatMap(
+            config ->
+                Stream.of(
+                    Arguments.of(null, 21, 22, config),
+                    Arguments.of(20, null, 22, config),
+                    Arguments.of(20, 21, null, config)));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testConfigureBlockIdLayoutInsufficientConfigExceptionSource")
+  public void testConfigureBlockIdLayoutInsufficientConfigException(
+      Integer sequenceNoBits, Integer partitionIdBits, Integer taskAttemptIdBits, String config) {
+    SparkConf sparkConf = new SparkConf();
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+
+    if (config.equals("spark")) {
+      String sparkPrefix = RssSparkConfig.SPARK_RSS_CONFIG_PREFIX;
+      String sparkSeqNoBitsKey = sparkPrefix + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key();
+      String sparkPartIdBitsKey = sparkPrefix + RssClientConf.BLOCKID_PARTITION_ID_BITS.key();
+      String sparkTaskIdBitsKey = sparkPrefix + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key();
+
+      if (sequenceNoBits != null) {
+        sparkConf.set(sparkSeqNoBitsKey, sequenceNoBits.toString());
+      }
+      if (partitionIdBits != null) {
+        sparkConf.set(sparkPartIdBitsKey, partitionIdBits.toString());
+      }
+      if (taskAttemptIdBits != null) {
+        sparkConf.set(sparkTaskIdBitsKey, taskAttemptIdBits.toString());
+      }
+    } else if (config.equals("rss")) {
+      if (sequenceNoBits != null) {
+        rssConf.set(RssClientConf.BLOCKID_SEQUENCE_NO_BITS, sequenceNoBits);
+      }
+      if (partitionIdBits != null) {
+        rssConf.set(RssClientConf.BLOCKID_PARTITION_ID_BITS, partitionIdBits);
+      }
+      if (taskAttemptIdBits != null) {
+        rssConf.set(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS, taskAttemptIdBits);
+      }
+    } else {
+      throw new IllegalArgumentException(config);
+    }
+
+    Executable call =
+        () -> RssShuffleManagerBase.configureBlockIdLayout(sparkConf, rssConf, 4, false);
+    Exception e = assertThrowsExactly(IllegalArgumentException.class, call);
+
+    assertTrue(e.getMessage().startsWith("All block id bit config keys must be provided "));
+  }
+
+  @Test
+  public void testGetMaxAttemptNo() {
+    // without speculation
+    assertEquals(0, RssShuffleManagerBase.getMaxAttemptNo(-1, false));
+    assertEquals(0, RssShuffleManagerBase.getMaxAttemptNo(0, false));
+    assertEquals(0, RssShuffleManagerBase.getMaxAttemptNo(1, false));
+    assertEquals(1, RssShuffleManagerBase.getMaxAttemptNo(2, false));
+    assertEquals(2, RssShuffleManagerBase.getMaxAttemptNo(3, false));
+    assertEquals(3, RssShuffleManagerBase.getMaxAttemptNo(4, false));
+    assertEquals(4, RssShuffleManagerBase.getMaxAttemptNo(5, false));
+    assertEquals(1023, RssShuffleManagerBase.getMaxAttemptNo(1024, false));
+
+    // with speculation
+    assertEquals(1, RssShuffleManagerBase.getMaxAttemptNo(-1, true));
+    assertEquals(1, RssShuffleManagerBase.getMaxAttemptNo(0, true));
+    assertEquals(1, RssShuffleManagerBase.getMaxAttemptNo(1, true));
+    assertEquals(2, RssShuffleManagerBase.getMaxAttemptNo(2, true));
+    assertEquals(3, RssShuffleManagerBase.getMaxAttemptNo(3, true));
+    assertEquals(4, RssShuffleManagerBase.getMaxAttemptNo(4, true));
+    assertEquals(5, RssShuffleManagerBase.getMaxAttemptNo(5, true));
+    assertEquals(1024, RssShuffleManagerBase.getMaxAttemptNo(1024, true));
+  }
+
+  @Test
+  public void testGetAttemptIdBits() {
+    assertEquals(0, RssShuffleManagerBase.getAttemptIdBits(0));
+    assertEquals(1, RssShuffleManagerBase.getAttemptIdBits(1));
+    assertEquals(2, RssShuffleManagerBase.getAttemptIdBits(2));
+    assertEquals(2, RssShuffleManagerBase.getAttemptIdBits(3));
+    assertEquals(3, RssShuffleManagerBase.getAttemptIdBits(4));
+    assertEquals(3, RssShuffleManagerBase.getAttemptIdBits(5));
+    assertEquals(3, RssShuffleManagerBase.getAttemptIdBits(6));
+    assertEquals(3, RssShuffleManagerBase.getAttemptIdBits(7));
+    assertEquals(4, RssShuffleManagerBase.getAttemptIdBits(8));
+    assertEquals(4, RssShuffleManagerBase.getAttemptIdBits(9));
+    assertEquals(10, RssShuffleManagerBase.getAttemptIdBits(1023));
+    assertEquals(11, RssShuffleManagerBase.getAttemptIdBits(1024));
+    assertEquals(11, RssShuffleManagerBase.getAttemptIdBits(1025));
+  }
+
   private long bits(String string) {
     return Long.parseLong(string.replaceAll("[|]", ""), 2);
   }
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 3cb78dd..75cda78 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -146,7 +146,10 @@
     this.sparkConf = sparkConf;
     this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
     this.speculation = sparkConf.getBoolean("spark.speculation", false);
-    this.blockIdLayout = BlockIdLayout.from(RssSparkConfig.toRssConf(sparkConf));
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+    // configureBlockIdLayout requires maxFailures and speculation to be initialized
+    configureBlockIdLayout(sparkConf, rssConf);
+    this.blockIdLayout = BlockIdLayout.from(rssConf);
     this.user = sparkConf.get("spark.rss.quota.user", "user");
     this.uuid = sparkConf.get("spark.rss.quota.uuid", Long.toString(System.currentTimeMillis()));
     // set & check replica config
@@ -182,7 +185,6 @@
         sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE);
     int unregisterRequestTimeoutSec =
         sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC);
-    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
     this.shuffleWriteClient =
         ShuffleClientFactory.getInstance()
             .createShuffleWriteClient(
@@ -496,6 +498,18 @@
     }
   }
 
+  /**
+   * Derives block id layout config from maximum number of allowed partitions. Computes the number
+   * of required bits for partition id and task attempt id and reserves remaining bits for sequence
+   * number.
+   *
+   * @param sparkConf Spark config providing max partitions
+   * @param rssConf Rss config to amend
+   */
+  public void configureBlockIdLayout(SparkConf sparkConf, RssConf rssConf) {
+    configureBlockIdLayout(sparkConf, rssConf, maxFailures, speculation);
+  }
+
   @Override
   public long getTaskAttemptIdForBlockId(int mapIndex, int attemptNo) {
     return getTaskAttemptIdForBlockId(
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 59e19d1..891ccd6 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -184,11 +184,13 @@
     this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
     this.dynamicConfEnabled = sparkConf.get(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED);
     this.dataDistributionType = getDataDistributionType(sparkConf);
-    this.blockIdLayout = BlockIdLayout.from(RssSparkConfig.toRssConf(sparkConf));
-    this.maxConcurrencyPerPartitionToWrite =
-        RssSparkConfig.toRssConf(sparkConf).get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+    this.maxConcurrencyPerPartitionToWrite = rssConf.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
     this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
     this.speculation = sparkConf.getBoolean("spark.speculation", false);
+    // configureBlockIdLayout requires maxFailures and speculation to be initialized
+    configureBlockIdLayout(sparkConf, rssConf);
+    this.blockIdLayout = BlockIdLayout.from(rssConf);
     long retryIntervalMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
     int heartBeatThreadNum = sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
     this.dataTransferPoolSize = sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
@@ -197,7 +199,6 @@
         sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE);
     int unregisterRequestTimeoutSec =
         sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC);
-    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
     shuffleWriteClient =
         ShuffleClientFactory.getInstance()
             .createShuffleWriteClient(
@@ -308,13 +309,14 @@
       Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker) {
     this.sparkConf = conf;
     this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
-    this.dataDistributionType =
-        RssSparkConfig.toRssConf(sparkConf).get(RssClientConf.DATA_DISTRIBUTION_TYPE);
-    this.blockIdLayout = BlockIdLayout.from(RssSparkConfig.toRssConf(sparkConf));
-    this.maxConcurrencyPerPartitionToWrite =
-        RssSparkConfig.toRssConf(sparkConf).get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
+    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
+    this.dataDistributionType = rssConf.get(RssClientConf.DATA_DISTRIBUTION_TYPE);
+    this.blockIdLayout = BlockIdLayout.from(rssConf);
+    this.maxConcurrencyPerPartitionToWrite = rssConf.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
     this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
     this.speculation = sparkConf.getBoolean("spark.speculation", false);
+    // configureBlockIdLayout requires maxFailures and speculation to be initialized
+    configureBlockIdLayout(sparkConf, rssConf);
     this.heartbeatInterval = sparkConf.get(RssSparkConfig.RSS_HEARTBEAT_INTERVAL);
     this.heartbeatTimeout =
         sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT.key(), heartbeatInterval / 2);
@@ -359,7 +361,7 @@
                     .dataCommitPoolSize(dataCommitPoolSize)
                     .unregisterThreadPoolSize(unregisterThreadPoolSize)
                     .unregisterRequestTimeSec(unregisterRequestTimeoutSec)
-                    .rssConf(RssSparkConfig.toRssConf(sparkConf)));
+                    .rssConf(rssConf));
     this.taskToSuccessBlockIds = taskToSuccessBlockIds;
     this.heartBeatScheduledExecutorService = null;
     this.taskToFailedBlockSendTracker = taskToFailedBlockSendTracker;
@@ -536,6 +538,11 @@
   }
 
   @Override
+  public void configureBlockIdLayout(SparkConf sparkConf, RssConf rssConf) {
+    configureBlockIdLayout(sparkConf, rssConf, maxFailures, speculation);
+  }
+
+  @Override
   public long getTaskAttemptIdForBlockId(int mapIndex, int attemptNo) {
     return getTaskAttemptIdForBlockId(
         mapIndex, attemptNo, maxFailures, speculation, blockIdLayout.taskAttemptIdBits);
diff --git a/common/src/main/java/org/apache/uniffle/common/util/BlockIdLayout.java b/common/src/main/java/org/apache/uniffle/common/util/BlockIdLayout.java
index 33d8cd6..efba2b8 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/BlockIdLayout.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/BlockIdLayout.java
@@ -31,6 +31,8 @@
  */
 public class BlockIdLayout {
 
+  // historic default values, client-specific config defaults may vary
+  // see RssSparkConfig.RSS_MAX_PARTITIONS
   public static final BlockIdLayout DEFAULT = BlockIdLayout.from(18, 24, 21);
 
   public final int sequenceNoBits;
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 8c769f7..1f4dd0b 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -22,7 +22,7 @@
   private Constants() {}
 
   // the value is used for client/server compatible, eg, online upgrade
-  public static final String SHUFFLE_SERVER_VERSION = "ss_v4";
+  public static final String SHUFFLE_SERVER_VERSION = "ss_v5";
   public static final String METRICS_TAG_LABEL_NAME = "tags";
   public static final String COORDINATOR_TAG = "coordinator";
   public static final String SHUFFLE_DATA_FILE_SUFFIX = ".data";
diff --git a/docs/client_guide/spark_client_guide.md b/docs/client_guide/spark_client_guide.md
index 779d92e..bbcfe59 100644
--- a/docs/client_guide/spark_client_guide.md
+++ b/docs/client_guide/spark_client_guide.md
@@ -101,7 +101,22 @@
 
 you should consider increasing the bits reserved in the blockId for that number / id (while decreasing the other number of bits).
 
-The bits reserved for sequence number, partition id and task attempt id are best specified for Spark clients as follows:
+Using the Spark client, configuring the blockId bits is as easy as defining a maximum number of supported partitions only:
+
+| Property Name                   | Default | Description                                                                                                                                                         |
+|---------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| spark.rss.blockId.maxPartitions | 1048576 | Number of partitions supported by the Spark client (`[2..2,147,483,648]`). |
+
+The Spark client derives the optimal values for the following properties.
+Alternatively, these properties can be configured instead of `spark.rss.blockId.maxPartitions`:
+
+| Property Name                       | Default | Description                                                                                                                                                         |
+|-------------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| spark.rss.blockId.sequenceNoBits    | 18      | Number of bits reserved in the blockId for the sequence number (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`. |
+| spark.rss.blockId.partitionIdBits   | 24      | Number of bits reserved in the blockId for the partition id (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`.    |
+| spark.rss.blockId.taskAttemptIdBits | 21      | Number of bits reserved in the blockId for the task attempt id (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`. |
+
+The bits reserved for sequence number, partition id and task attempt id are best specified for Spark clients as follows (done automatically if `spark.rss.blockId.maxPartitions` is set):
 
 1. Reserve the bits required to support the largest number of partitions that you anticipate. Pick `ceil( log(max number of partitions) / log(2) )` bits.
    For instance, `20` bits support `1,048,576` partitions.
@@ -111,12 +126,6 @@
    For example: `22` bits is sufficient for `taskAttemptIdBits` with `partitionIdBits=20`, and Spark conf `spark.task.maxFailures=4` and `spark.speculation=false`.
 3. Reserve the remaining bits to `sequenceNoBits`: `sequenceNoBits = 63 - partitionIdBits - taskAttemptIdBits`.
 
-| Property Name                       | Default | Description                                                                                                                                                         |
-|-------------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| spark.rss.blockId.sequenceNoBits    | 18      | Number of bits reserved in the blockId for the sequence number (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`. |
-| spark.rss.blockId.partitionIdBits   | 24      | Number of bits reserved in the blockId for the partition id (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`.    |
-| spark.rss.blockId.taskAttemptIdBits | 21      | Number of bits reserved in the blockId for the task attempt id (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`. |
-
 ### Adaptive Remote Shuffle Enabling 
 Currently, this feature only supports Spark. 
 
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index ea598e4..39e3dee 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -75,7 +75,6 @@
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.BlockIdLayout;
-import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.proto.RssProtos;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
@@ -420,20 +419,6 @@
     addExpectedBlockIds(expectedP3, blockIds3);
     assertEquals(expectedP3, blockIdBitmap);
 
-    // get same shuffle result without block id layout (legacy clients)
-    RssProtos.GetShuffleResultRequest rpcRequest =
-        RssProtos.GetShuffleResultRequest.newBuilder()
-            .setAppId("shuffleResultTest")
-            .setShuffleId(4)
-            .setPartitionId(3)
-            // deliberately not setting block id layout through .setBlockIdLayout
-            .build();
-    RssProtos.GetShuffleResultResponse rpcResponse =
-        grpcShuffleServerClient.getBlockingStub().getShuffleResult(rpcRequest);
-    assertEquals(RssProtos.StatusCode.SUCCESS, rpcResponse.getStatus());
-    blockIdBitmap = RssUtils.deserializeBitMap(rpcResponse.getSerializedBitmap().toByteArray());
-    assertEquals(expectedP3, blockIdBitmap);
-
     // wait resources are deleted
     Thread.sleep(12000);
     req = new RssGetShuffleResultRequest("shuffleResultTest", 1, 1, layout);
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
index 6d797a8..ac6d739 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
@@ -103,6 +103,7 @@
     return new HashMap();
   }
 
+  private static final BlockIdLayout DEFAULT = BlockIdLayout.from(21, 20, 22);
   private static final BlockIdLayout CUSTOM1 = BlockIdLayout.from(20, 21, 22);
   private static final BlockIdLayout CUSTOM2 = BlockIdLayout.from(22, 18, 23);
 
@@ -113,7 +114,7 @@
   @ParameterizedTest
   @ValueSource(booleans = {false, true})
   public void testRssShuffleManager(boolean enableDynamicClientConf) throws Exception {
-    doTestRssShuffleManager(null, null, BlockIdLayout.DEFAULT, enableDynamicClientConf);
+    doTestRssShuffleManager(null, null, DEFAULT, enableDynamicClientConf);
   }
 
   @ParameterizedTest
@@ -187,6 +188,7 @@
       // get written block ids (we know there is one shuffle where two task attempts wrote two
       // partitions)
       RssConf rssConf = RssSparkConfig.toRssConf(conf);
+      shuffleManager.configureBlockIdLayout(conf, rssConf);
       ShuffleWriteClient shuffleWriteClient =
           ShuffleClientFactory.newWriteBuilder()
               .clientType(ClientType.GRPC.name())
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 038e03b..695c2af 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -529,15 +529,11 @@
     String appId = request.getAppId();
     int shuffleId = request.getShuffleId();
     int partitionId = request.getPartitionId();
-    BlockIdLayout blockIdLayout = BlockIdLayout.DEFAULT;
-    // legacy clients might send request without block id layout, we fall back to DEFAULT then
-    if (request.hasBlockIdLayout()) {
-      blockIdLayout =
-          BlockIdLayout.from(
-              request.getBlockIdLayout().getSequenceNoBits(),
-              request.getBlockIdLayout().getPartitionIdBits(),
-              request.getBlockIdLayout().getTaskAttemptIdBits());
-    }
+    BlockIdLayout blockIdLayout =
+        BlockIdLayout.from(
+            request.getBlockIdLayout().getSequenceNoBits(),
+            request.getBlockIdLayout().getPartitionIdBits(),
+            request.getBlockIdLayout().getTaskAttemptIdBits());
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";
     GetShuffleResultResponse reply;
@@ -581,15 +577,11 @@
     String appId = request.getAppId();
     int shuffleId = request.getShuffleId();
     List<Integer> partitionsList = request.getPartitionsList();
-    BlockIdLayout blockIdLayout = BlockIdLayout.DEFAULT;
-    // legacy clients might send request without block id layout, we fall back to DEFAULT then
-    if (request.hasBlockIdLayout()) {
-      blockIdLayout =
-          BlockIdLayout.from(
-              request.getBlockIdLayout().getSequenceNoBits(),
-              request.getBlockIdLayout().getPartitionIdBits(),
-              request.getBlockIdLayout().getTaskAttemptIdBits());
-    }
+    BlockIdLayout blockIdLayout =
+        BlockIdLayout.from(
+            request.getBlockIdLayout().getSequenceNoBits(),
+            request.getBlockIdLayout().getPartitionIdBits(),
+            request.getBlockIdLayout().getTaskAttemptIdBits());
 
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";