[#731] feat(spark): Make blockid layout configurable for Spark clients (#1528)

### What changes were proposed in this pull request?
Make bit-lengths in block id (block id layout) configurable through dynamic client config from coordinator or client config. Block id layout can be created from RssConf, or where that is not available, is being passed around.
 
- Adds new options (defaults are equivalent to current values in `Constants`):
  - rss.client.blockId.sequenceNoBits
  - rss.client.blockId.partitionIdBits
  - rss.client.blockId.taskAttemptIdBits
- Adds block id layout to two requests (default is layout with current values in `Constants`).

Default values have moved from `Constants` into `BlockIdLayout`. The following replacements exist:
- `PARTITION_ID_MAX_LENGTH`: `BlockIdLayout.DEFAULT.partitionIdBits`
- `TASK_ATTEMPT_ID_MAX_LENGTH`: `BlockIdLayout.DEFAULT.taskAttemptIdBits`
- `ATOMIC_INT_MAX_LENGTH`: `BlockIdLayout.DEFAULT.sequenceNoBits`

### Why are the changes needed?
The bit-lengths of sequence number, partition id and task attempt id in block id are defined in `common/src/main/java/org/apache/uniffle/common/util/Constants.java`. Changing these requires recompiling and redeploying the project. Making this configurable in `coordinator.conf`, `server.conf` or client-side would very useful.

Also see #1512, #749.

Fixes #731.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Tests.
diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index e92bf07..f122048 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -37,23 +37,24 @@
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.Constants;
 
 public class RssMRUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(RssMRUtils.class);
+  private static final BlockIdLayout LAYOUT = BlockIdLayout.DEFAULT;
   private static final int MAX_ATTEMPT_LENGTH = 6;
   private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
   private static final int MAX_SEQUENCE_NO =
-      (1 << (Constants.ATOMIC_INT_MAX_LENGTH - MAX_ATTEMPT_LENGTH)) - 1;
+      (1 << (LAYOUT.sequenceNoBits - MAX_ATTEMPT_LENGTH)) - 1;
 
   // Class TaskAttemptId have two field id and mapId, rss taskAttemptID have 21 bits,
   // mapId is 19 bits, id is 2 bits. MR have a trick logic, taskAttemptId will increase
   // 1000 * (appAttemptId - 1), so we will decrease it.
   public static long convertTaskAttemptIdToLong(TaskAttemptID taskAttemptID, int appAttemptId) {
     int lowBytes = taskAttemptID.getTaskID().getId();
-    if (lowBytes > Constants.MAX_TASK_ATTEMPT_ID) {
+    if (lowBytes > LAYOUT.maxTaskAttemptId) {
       throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed");
     }
     if (appAttemptId < 1) {
@@ -64,7 +65,7 @@
       throw new RssException(
           "TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " exceed");
     }
-    return BlockId.getBlockId(highBytes, 0, lowBytes);
+    return LAYOUT.getBlockId(highBytes, 0, lowBytes);
   }
 
   public static TaskAttemptID createMRTaskAttemptId(
@@ -72,8 +73,8 @@
     if (appAttemptId < 1) {
       throw new RssException("appAttemptId " + appAttemptId + " is wrong");
     }
-    TaskID taskID = new TaskID(jobID, taskType, BlockId.getTaskAttemptId(rssTaskAttemptId));
-    int id = BlockId.getSequenceNo(rssTaskAttemptId) + 1000 * (appAttemptId - 1);
+    TaskID taskID = new TaskID(jobID, taskType, LAYOUT.getTaskAttemptId(rssTaskAttemptId));
+    int id = LAYOUT.getSequenceNo(rssTaskAttemptId) + 1000 * (appAttemptId - 1);
     return new TaskAttemptID(taskID, id);
   }
 
@@ -227,8 +228,7 @@
   }
 
   public static long getBlockId(int partitionId, long taskAttemptId, int nextSeqNo) {
-    long attemptId =
-        taskAttemptId >> (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
+    long attemptId = taskAttemptId >> (LAYOUT.partitionIdBits + LAYOUT.taskAttemptIdBits);
     if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
       throw new RssException(
           "Can't support attemptId [" + attemptId + "], the max value should be " + MAX_ATTEMPT_ID);
@@ -240,17 +240,15 @@
 
     int atomicInt = (int) ((nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId);
     long taskId =
-        taskAttemptId
-            - (attemptId
-                << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+        taskAttemptId - (attemptId << (LAYOUT.partitionIdBits + LAYOUT.taskAttemptIdBits));
 
-    return BlockId.getBlockId(atomicInt, partitionId, taskId);
+    return LAYOUT.getBlockId(atomicInt, partitionId, taskId);
   }
 
   public static long getTaskAttemptId(long blockId) {
-    int mapId = BlockId.getTaskAttemptId(blockId);
-    int attemptId = BlockId.getSequenceNo(blockId) & MAX_ATTEMPT_ID;
-    return BlockId.getBlockId(attemptId, 0, mapId);
+    int mapId = LAYOUT.getTaskAttemptId(blockId);
+    int attemptId = LAYOUT.getSequenceNo(blockId) & MAX_ATTEMPT_ID;
+    return LAYOUT.getBlockId(attemptId, 0, mapId);
   }
 
   public static int estimateTaskConcurrency(JobConf jobConf) {
diff --git a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
index c95951e..2d3710c 100644
--- a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
+++ b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
@@ -25,6 +25,7 @@
 
 import org.apache.uniffle.client.util.RssClientConfig;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.util.StorageType;
 
@@ -80,16 +81,16 @@
 
   @Test
   public void partitionIdConvertBlockTest() {
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
     JobID jobID = new JobID();
     TaskID taskId = new TaskID(jobID, TaskType.MAP, 233);
     TaskAttemptID taskAttemptID = new TaskAttemptID(taskId, 1);
     long taskAttemptId = RssMRUtils.convertTaskAttemptIdToLong(taskAttemptID, 1);
-    long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
+    long mask = (1L << layout.partitionIdBits) - 1;
     for (int partitionId = 0; partitionId <= 3000; partitionId++) {
       for (int seqNo = 0; seqNo <= 10; seqNo++) {
         long blockId = RssMRUtils.getBlockId(partitionId, taskAttemptId, seqNo);
-        int newPartitionId =
-            Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
+        int newPartitionId = Math.toIntExact((blockId >> layout.taskAttemptIdBits) & mask);
         assertEquals(partitionId, newPartitionId);
       }
     }
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index b339984..6c4c41a 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -51,7 +51,7 @@
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ChecksumUtils;
 
 public class WriteBufferManager extends MemoryConsumer {
@@ -97,6 +97,7 @@
   private boolean memorySpillEnabled;
   private int memorySpillTimeoutSec;
   private boolean isRowBased;
+  private BlockIdLayout blockIdLayout;
 
   public WriteBufferManager(
       int shuffleId,
@@ -162,6 +163,7 @@
     this.sendSizeLimit = rssConf.get(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMITATION);
     this.memorySpillTimeoutSec = rssConf.get(RssSparkConfig.RSS_MEMORY_SPILL_TIMEOUT);
     this.memorySpillEnabled = rssConf.get(RssSparkConfig.RSS_MEMORY_SPILL_ENABLED);
+    this.blockIdLayout = BlockIdLayout.from(rssConf);
   }
 
   /** add serialized columnar data directly when integrate with gluten */
@@ -329,7 +331,8 @@
       compressTime += System.currentTimeMillis() - start;
     }
     final long crc32 = ChecksumUtils.getCrc32(compressed);
-    final long blockId = BlockId.getBlockId(getNextSeqNo(partitionId), partitionId, taskAttemptId);
+    final long blockId =
+        blockIdLayout.getBlockId(getNextSeqNo(partitionId), partitionId, taskAttemptId);
     uncompressedDataLen += data.length;
     shuffleWriteMetrics.incBytesWritten(compressed.length);
     // add memory to indicate bytes which will be sent to shuffle server
diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index fa7aa05..14c99b8 100644
--- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -23,7 +23,6 @@
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.MapOutputTracker;
@@ -51,8 +50,11 @@
   private Method unregisterAllMapOutputMethod;
   private Method registerShuffleMethod;
 
+  /** See static overload of this method. */
+  public abstract long getTaskAttemptIdForBlockId(int mapIndex, int attemptNo);
+
   /**
-   * Provides a task attempt id that is unique for a shuffle stage.
+   * Provides a task attempt id to be used in the block id, that is unique for a shuffle stage.
    *
    * <p>We are not using context.taskAttemptId() here as this is a monotonically increasing number
    * that is unique across the entire Spark app which can reach very large numbers, which can
@@ -64,8 +66,7 @@
    *
    * @return a task attempt id unique for a shuffle stage
    */
-  @VisibleForTesting
-  protected static long getTaskAttemptId(
+  protected static long getTaskAttemptIdForBlockId(
       int mapIndex, int attemptNo, int maxFailures, boolean speculation, int maxTaskAttemptIdBits) {
     // attempt number is zero based: 0, 1, …, maxFailures-1
     // max maxFailures < 1 is not allowed but for safety, we interpret that as maxFailures == 1
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
index 90505d9..f761c6e 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
@@ -37,7 +37,7 @@
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.config.RssConf;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.storage.HadoopTestBase;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -76,6 +76,31 @@
         handler,
         blockNum,
         recordNum,
+        BlockIdLayout.DEFAULT,
+        expectedData,
+        blockIdBitmap,
+        keyPrefix,
+        serializer,
+        partitionID,
+        true);
+  }
+
+  protected void writeTestData(
+      ShuffleWriteHandler handler,
+      int blockNum,
+      int recordNum,
+      BlockIdLayout layout,
+      Map<String, String> expectedData,
+      Roaring64NavigableMap blockIdBitmap,
+      String keyPrefix,
+      Serializer serializer,
+      int partitionID)
+      throws Exception {
+    writeTestData(
+        handler,
+        blockNum,
+        recordNum,
+        layout,
         expectedData,
         blockIdBitmap,
         keyPrefix,
@@ -95,6 +120,31 @@
       int partitionID,
       boolean compress)
       throws Exception {
+    writeTestData(
+        handler,
+        blockNum,
+        recordNum,
+        BlockIdLayout.DEFAULT,
+        expectedData,
+        blockIdBitmap,
+        keyPrefix,
+        serializer,
+        partitionID,
+        compress);
+  }
+
+  protected void writeTestData(
+      ShuffleWriteHandler handler,
+      int blockNum,
+      int recordNum,
+      BlockIdLayout layout,
+      Map<String, String> expectedData,
+      Roaring64NavigableMap blockIdBitmap,
+      String keyPrefix,
+      Serializer serializer,
+      int partitionID,
+      boolean compress)
+      throws Exception {
     List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
     SerializerInstance serializerInstance = serializer.newInstance();
     for (int i = 0; i < blockNum; i++) {
@@ -106,7 +156,7 @@
         expectedData.put(key, value);
         writeData(serializeStream, key, value);
       }
-      long blockId = BlockId.getBlockId(atomicInteger.getAndIncrement(), partitionID, 0);
+      long blockId = layout.getBlockId(atomicInteger.getAndIncrement(), partitionID, 0);
       blockIdBitmap.add(blockId);
       blocks.add(createShuffleBlock(output.toBytes(), blockId, compress));
       serializeStream.close();
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index 2962bc1..b7a44bf 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -20,6 +20,7 @@
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -33,6 +34,9 @@
 import org.apache.spark.shuffle.RssSparkConfig;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -42,9 +46,8 @@
 import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssConf;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
 import org.apache.uniffle.storage.util.StorageType;
 
@@ -65,8 +68,14 @@
   private ShuffleServerInfo ssi1 = new ShuffleServerInfo("host1-0", "host1", 0);
   private ShuffleServerInfo ssi2 = new ShuffleServerInfo("host2-0", "host2", 0);
 
-  @Test
-  public void readTest1() throws Exception {
+  public static Stream<Arguments> testBlockIdLayouts() {
+    return Stream.of(
+        Arguments.of(BlockIdLayout.DEFAULT), Arguments.of(BlockIdLayout.from(20, 21, 22)));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testBlockIdLayouts")
+  public void readTest1(BlockIdLayout layout) throws Exception {
     String basePath = HDFS_URI + "readTest1";
     HadoopShuffleWriteHandler writeHandler =
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
@@ -74,14 +83,15 @@
     Map<String, String> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
-    writeTestData(writeHandler, 2, 5, expectedData, blockIdBitmap, "key", KRYO_SERIALIZER, 0);
+    writeTestData(
+        writeHandler, 2, 5, layout, expectedData, blockIdBitmap, "key", KRYO_SERIALIZER, 0);
 
     RssShuffleDataIterator rssShuffleDataIterator =
         getDataIterator(basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1));
 
     validateResult(rssShuffleDataIterator, expectedData, 10);
 
-    blockIdBitmap.add(BlockId.getBlockId(Constants.MAX_SEQUENCE_NO, 0, 0));
+    blockIdBitmap.add(layout.getBlockId(layout.maxSequenceNo, 0, 0));
     rssShuffleDataIterator =
         getDataIterator(basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1));
     int recNum = 0;
@@ -270,7 +280,9 @@
         }
         fail(EXPECTED_EXCEPTION_MESSAGE);
       } catch (Exception e) {
-        assertTrue(e.getMessage().startsWith("Unexpected crc value"));
+        assertTrue(
+            e.getMessage()
+                .startsWith("Unexpected crc value for blockId[0 (seq: 0, part: 0, task: 0)]"));
       }
 
       try {
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
index c0fb191..38ebbbd 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
@@ -24,6 +24,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Stream;
 
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.reflect.FieldUtils;
@@ -38,11 +39,16 @@
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockIdLayout;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -84,18 +90,34 @@
     return conf;
   }
 
-  @Test
-  public void addRecordCompressedTest() throws Exception {
-    addRecord(true);
+  public static Stream<Arguments> testBlockIdLayouts() {
+    return Stream.of(
+        Arguments.of(BlockIdLayout.DEFAULT), Arguments.of(BlockIdLayout.from(20, 21, 22)));
   }
 
-  @Test
-  public void addRecordUnCompressedTest() throws Exception {
-    addRecord(false);
+  @ParameterizedTest
+  @MethodSource("testBlockIdLayouts")
+  public void addRecordCompressedTest(BlockIdLayout layout) throws Exception {
+    addRecord(true, layout);
   }
 
-  private void addRecord(boolean compress) throws IllegalAccessException {
+  @ParameterizedTest
+  @MethodSource("testBlockIdLayouts")
+  public void addRecordUnCompressedTest(BlockIdLayout layout) throws Exception {
+    addRecord(false, layout);
+  }
+
+  private void addRecord(boolean compress, BlockIdLayout layout) throws IllegalAccessException {
     SparkConf conf = getConf();
+    conf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key(),
+        String.valueOf(layout.sequenceNoBits));
+    conf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_PARTITION_ID_BITS.key(),
+        String.valueOf(layout.partitionIdBits));
+    conf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key(),
+        String.valueOf(layout.taskAttemptIdBits));
     if (!compress) {
       conf.set(RssSparkConfig.SPARK_SHUFFLE_COMPRESS_KEY, String.valueOf(false));
     }
@@ -122,6 +144,7 @@
     result = wbm.addRecord(0, testKey, testValue);
     // single buffer is full
     assertEquals(1, result.size());
+    assertEquals(layout.asBlockId(0, 0, 0), layout.asBlockId(result.get(0).getBlockId()));
     assertEquals(512, wbm.getAllocatedBytes());
     assertEquals(96, wbm.getUsedBytes());
     assertEquals(96, wbm.getInSendListBytes());
@@ -139,6 +162,12 @@
     wbm.addRecord(4, testKey, testValue);
     result = wbm.addRecord(5, testKey, testValue);
     assertEquals(6, result.size());
+    assertEquals(layout.asBlockId(1, 0, 0), layout.asBlockId(result.get(0).getBlockId()));
+    assertEquals(layout.asBlockId(0, 1, 0), layout.asBlockId(result.get(1).getBlockId()));
+    assertEquals(layout.asBlockId(0, 2, 0), layout.asBlockId(result.get(2).getBlockId()));
+    assertEquals(layout.asBlockId(0, 3, 0), layout.asBlockId(result.get(3).getBlockId()));
+    assertEquals(layout.asBlockId(0, 4, 0), layout.asBlockId(result.get(4).getBlockId()));
+    assertEquals(layout.asBlockId(0, 5, 0), layout.asBlockId(result.get(5).getBlockId()));
     assertEquals(512, wbm.getAllocatedBytes());
     assertEquals(288, wbm.getUsedBytes());
     assertEquals(288, wbm.getInSendListBytes());
@@ -221,28 +250,40 @@
     assertEquals(0, spyManager.getShuffleWriteMetrics().recordsWritten());
   }
 
-  @Test
-  public void createBlockIdTest() {
+  @ParameterizedTest
+  @MethodSource("testBlockIdLayouts")
+  public void createBlockIdTest(BlockIdLayout layout) {
     SparkConf conf = getConf();
+    conf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key(),
+        String.valueOf(layout.sequenceNoBits));
+    conf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_PARTITION_ID_BITS.key(),
+        String.valueOf(layout.partitionIdBits));
+    conf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key(),
+        String.valueOf(layout.taskAttemptIdBits));
+
     WriteBufferManager wbm = createManager(conf);
     WriterBuffer mockWriterBuffer = mock(WriterBuffer.class);
     when(mockWriterBuffer.getData()).thenReturn(new byte[] {});
     when(mockWriterBuffer.getMemoryUsed()).thenReturn(0);
     ShuffleBlockInfo sbi = wbm.createShuffleBlock(0, mockWriterBuffer);
+
     // seqNo = 0, partitionId = 0, taskId = 0
-    assertEquals(0L, sbi.getBlockId());
+    assertEquals(layout.asBlockId(0, 0, 0), layout.asBlockId(sbi.getBlockId()));
 
     // seqNo = 1, partitionId = 0, taskId = 0
     sbi = wbm.createShuffleBlock(0, mockWriterBuffer);
-    assertEquals(35184372088832L, sbi.getBlockId());
+    assertEquals(layout.asBlockId(1, 0, 0), layout.asBlockId(sbi.getBlockId()));
 
     // seqNo = 0, partitionId = 1, taskId = 0
     sbi = wbm.createShuffleBlock(1, mockWriterBuffer);
-    assertEquals(2097152L, sbi.getBlockId());
+    assertEquals(layout.asBlockId(0, 1, 0), layout.asBlockId(sbi.getBlockId()));
 
     // seqNo = 1, partitionId = 1, taskId = 0
     sbi = wbm.createShuffleBlock(1, mockWriterBuffer);
-    assertEquals(35184374185984L, sbi.getBlockId());
+    assertEquals(layout.asBlockId(1, 1, 0), layout.asBlockId(sbi.getBlockId()));
   }
 
   @Test
diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
index 3d8ea05..9440a69 100644
--- a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
+++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBaseTest.java
@@ -25,6 +25,7 @@
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.exception.RssException;
 
+import static org.apache.uniffle.shuffle.manager.RssShuffleManagerBase.getTaskAttemptIdForBlockId;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -57,102 +58,96 @@
     for (int maxFailures : Arrays.asList(-1, 0, 1)) {
       assertEquals(
           bits("0000|"),
-          RssShuffleManagerBase.getTaskAttemptId(0, 0, maxFailures, false, 10),
+          getTaskAttemptIdForBlockId(0, 0, maxFailures, false, 10),
           String.valueOf(maxFailures));
       assertEquals(
           bits("0001|"),
-          RssShuffleManagerBase.getTaskAttemptId(1, 0, maxFailures, false, 10),
+          getTaskAttemptIdForBlockId(1, 0, maxFailures, false, 10),
           String.valueOf(maxFailures));
       assertEquals(
           bits("0010|"),
-          RssShuffleManagerBase.getTaskAttemptId(2, 0, maxFailures, false, 10),
+          getTaskAttemptIdForBlockId(2, 0, maxFailures, false, 10),
           String.valueOf(maxFailures));
     }
 
     // maxFailures of 2
-    assertEquals(bits("000|0"), RssShuffleManagerBase.getTaskAttemptId(0, 0, 2, false, 10));
-    assertEquals(bits("000|1"), RssShuffleManagerBase.getTaskAttemptId(0, 1, 2, false, 10));
-    assertEquals(bits("001|0"), RssShuffleManagerBase.getTaskAttemptId(1, 0, 2, false, 10));
-    assertEquals(bits("001|1"), RssShuffleManagerBase.getTaskAttemptId(1, 1, 2, false, 10));
-    assertEquals(bits("010|0"), RssShuffleManagerBase.getTaskAttemptId(2, 0, 2, false, 10));
-    assertEquals(bits("010|1"), RssShuffleManagerBase.getTaskAttemptId(2, 1, 2, false, 10));
-    assertEquals(bits("011|0"), RssShuffleManagerBase.getTaskAttemptId(3, 0, 2, false, 10));
-    assertEquals(bits("011|1"), RssShuffleManagerBase.getTaskAttemptId(3, 1, 2, false, 10));
+    assertEquals(bits("000|0"), getTaskAttemptIdForBlockId(0, 0, 2, false, 10));
+    assertEquals(bits("000|1"), getTaskAttemptIdForBlockId(0, 1, 2, false, 10));
+    assertEquals(bits("001|0"), getTaskAttemptIdForBlockId(1, 0, 2, false, 10));
+    assertEquals(bits("001|1"), getTaskAttemptIdForBlockId(1, 1, 2, false, 10));
+    assertEquals(bits("010|0"), getTaskAttemptIdForBlockId(2, 0, 2, false, 10));
+    assertEquals(bits("010|1"), getTaskAttemptIdForBlockId(2, 1, 2, false, 10));
+    assertEquals(bits("011|0"), getTaskAttemptIdForBlockId(3, 0, 2, false, 10));
+    assertEquals(bits("011|1"), getTaskAttemptIdForBlockId(3, 1, 2, false, 10));
 
     // maxFailures of 3
-    assertEquals(bits("00|00"), RssShuffleManagerBase.getTaskAttemptId(0, 0, 3, false, 10));
-    assertEquals(bits("00|01"), RssShuffleManagerBase.getTaskAttemptId(0, 1, 3, false, 10));
-    assertEquals(bits("00|10"), RssShuffleManagerBase.getTaskAttemptId(0, 2, 3, false, 10));
-    assertEquals(bits("01|00"), RssShuffleManagerBase.getTaskAttemptId(1, 0, 3, false, 10));
-    assertEquals(bits("01|01"), RssShuffleManagerBase.getTaskAttemptId(1, 1, 3, false, 10));
-    assertEquals(bits("01|10"), RssShuffleManagerBase.getTaskAttemptId(1, 2, 3, false, 10));
-    assertEquals(bits("10|00"), RssShuffleManagerBase.getTaskAttemptId(2, 0, 3, false, 10));
-    assertEquals(bits("10|01"), RssShuffleManagerBase.getTaskAttemptId(2, 1, 3, false, 10));
-    assertEquals(bits("10|10"), RssShuffleManagerBase.getTaskAttemptId(2, 2, 3, false, 10));
-    assertEquals(bits("11|00"), RssShuffleManagerBase.getTaskAttemptId(3, 0, 3, false, 10));
-    assertEquals(bits("11|01"), RssShuffleManagerBase.getTaskAttemptId(3, 1, 3, false, 10));
-    assertEquals(bits("11|10"), RssShuffleManagerBase.getTaskAttemptId(3, 2, 3, false, 10));
+    assertEquals(bits("00|00"), getTaskAttemptIdForBlockId(0, 0, 3, false, 10));
+    assertEquals(bits("00|01"), getTaskAttemptIdForBlockId(0, 1, 3, false, 10));
+    assertEquals(bits("00|10"), getTaskAttemptIdForBlockId(0, 2, 3, false, 10));
+    assertEquals(bits("01|00"), getTaskAttemptIdForBlockId(1, 0, 3, false, 10));
+    assertEquals(bits("01|01"), getTaskAttemptIdForBlockId(1, 1, 3, false, 10));
+    assertEquals(bits("01|10"), getTaskAttemptIdForBlockId(1, 2, 3, false, 10));
+    assertEquals(bits("10|00"), getTaskAttemptIdForBlockId(2, 0, 3, false, 10));
+    assertEquals(bits("10|01"), getTaskAttemptIdForBlockId(2, 1, 3, false, 10));
+    assertEquals(bits("10|10"), getTaskAttemptIdForBlockId(2, 2, 3, false, 10));
+    assertEquals(bits("11|00"), getTaskAttemptIdForBlockId(3, 0, 3, false, 10));
+    assertEquals(bits("11|01"), getTaskAttemptIdForBlockId(3, 1, 3, false, 10));
+    assertEquals(bits("11|10"), getTaskAttemptIdForBlockId(3, 2, 3, false, 10));
 
     // maxFailures of 4
-    assertEquals(bits("00|00"), RssShuffleManagerBase.getTaskAttemptId(0, 0, 4, false, 10));
-    assertEquals(bits("00|01"), RssShuffleManagerBase.getTaskAttemptId(0, 1, 4, false, 10));
-    assertEquals(bits("00|10"), RssShuffleManagerBase.getTaskAttemptId(0, 2, 4, false, 10));
-    assertEquals(bits("00|11"), RssShuffleManagerBase.getTaskAttemptId(0, 3, 4, false, 10));
-    assertEquals(bits("01|00"), RssShuffleManagerBase.getTaskAttemptId(1, 0, 4, false, 10));
-    assertEquals(bits("01|01"), RssShuffleManagerBase.getTaskAttemptId(1, 1, 4, false, 10));
-    assertEquals(bits("01|10"), RssShuffleManagerBase.getTaskAttemptId(1, 2, 4, false, 10));
-    assertEquals(bits("01|11"), RssShuffleManagerBase.getTaskAttemptId(1, 3, 4, false, 10));
-    assertEquals(bits("10|00"), RssShuffleManagerBase.getTaskAttemptId(2, 0, 4, false, 10));
-    assertEquals(bits("10|01"), RssShuffleManagerBase.getTaskAttemptId(2, 1, 4, false, 10));
-    assertEquals(bits("10|10"), RssShuffleManagerBase.getTaskAttemptId(2, 2, 4, false, 10));
-    assertEquals(bits("10|11"), RssShuffleManagerBase.getTaskAttemptId(2, 3, 4, false, 10));
-    assertEquals(bits("11|00"), RssShuffleManagerBase.getTaskAttemptId(3, 0, 4, false, 10));
-    assertEquals(bits("11|01"), RssShuffleManagerBase.getTaskAttemptId(3, 1, 4, false, 10));
-    assertEquals(bits("11|10"), RssShuffleManagerBase.getTaskAttemptId(3, 2, 4, false, 10));
-    assertEquals(bits("11|11"), RssShuffleManagerBase.getTaskAttemptId(3, 3, 4, false, 10));
+    assertEquals(bits("00|00"), getTaskAttemptIdForBlockId(0, 0, 4, false, 10));
+    assertEquals(bits("00|01"), getTaskAttemptIdForBlockId(0, 1, 4, false, 10));
+    assertEquals(bits("00|10"), getTaskAttemptIdForBlockId(0, 2, 4, false, 10));
+    assertEquals(bits("00|11"), getTaskAttemptIdForBlockId(0, 3, 4, false, 10));
+    assertEquals(bits("01|00"), getTaskAttemptIdForBlockId(1, 0, 4, false, 10));
+    assertEquals(bits("01|01"), getTaskAttemptIdForBlockId(1, 1, 4, false, 10));
+    assertEquals(bits("01|10"), getTaskAttemptIdForBlockId(1, 2, 4, false, 10));
+    assertEquals(bits("01|11"), getTaskAttemptIdForBlockId(1, 3, 4, false, 10));
+    assertEquals(bits("10|00"), getTaskAttemptIdForBlockId(2, 0, 4, false, 10));
+    assertEquals(bits("10|01"), getTaskAttemptIdForBlockId(2, 1, 4, false, 10));
+    assertEquals(bits("10|10"), getTaskAttemptIdForBlockId(2, 2, 4, false, 10));
+    assertEquals(bits("10|11"), getTaskAttemptIdForBlockId(2, 3, 4, false, 10));
+    assertEquals(bits("11|00"), getTaskAttemptIdForBlockId(3, 0, 4, false, 10));
+    assertEquals(bits("11|01"), getTaskAttemptIdForBlockId(3, 1, 4, false, 10));
+    assertEquals(bits("11|10"), getTaskAttemptIdForBlockId(3, 2, 4, false, 10));
+    assertEquals(bits("11|11"), getTaskAttemptIdForBlockId(3, 3, 4, false, 10));
 
     // maxFailures of 5
-    assertEquals(bits("0|000"), RssShuffleManagerBase.getTaskAttemptId(0, 0, 5, false, 10));
-    assertEquals(bits("1|100"), RssShuffleManagerBase.getTaskAttemptId(1, 4, 5, false, 10));
+    assertEquals(bits("0|000"), getTaskAttemptIdForBlockId(0, 0, 5, false, 10));
+    assertEquals(bits("1|100"), getTaskAttemptIdForBlockId(1, 4, 5, false, 10));
 
     // test with ints that overflow into signed int and long
-    assertEquals(
-        Integer.MAX_VALUE,
-        RssShuffleManagerBase.getTaskAttemptId(Integer.MAX_VALUE, 0, 1, false, 31));
+    assertEquals(Integer.MAX_VALUE, getTaskAttemptIdForBlockId(Integer.MAX_VALUE, 0, 1, false, 31));
     assertEquals(
         (long) Integer.MAX_VALUE << 1 | 1,
-        RssShuffleManagerBase.getTaskAttemptId(Integer.MAX_VALUE, 1, 2, false, 32));
+        getTaskAttemptIdForBlockId(Integer.MAX_VALUE, 1, 2, false, 32));
     assertEquals(
         (long) Integer.MAX_VALUE << 2 | 3,
-        RssShuffleManagerBase.getTaskAttemptId(Integer.MAX_VALUE, 3, 4, false, 33));
+        getTaskAttemptIdForBlockId(Integer.MAX_VALUE, 3, 4, false, 33));
     assertEquals(
         (long) Integer.MAX_VALUE << 3 | 7,
-        RssShuffleManagerBase.getTaskAttemptId(Integer.MAX_VALUE, 7, 8, false, 34));
+        getTaskAttemptIdForBlockId(Integer.MAX_VALUE, 7, 8, false, 34));
 
     // test with attemptNo >= maxFailures
-    assertThrowsExactly(
-        RssException.class, () -> RssShuffleManagerBase.getTaskAttemptId(0, 1, -1, false, 10));
-    assertThrowsExactly(
-        RssException.class, () -> RssShuffleManagerBase.getTaskAttemptId(0, 1, 0, false, 10));
+    assertThrowsExactly(RssException.class, () -> getTaskAttemptIdForBlockId(0, 1, -1, false, 10));
+    assertThrowsExactly(RssException.class, () -> getTaskAttemptIdForBlockId(0, 1, 0, false, 10));
     for (int maxFailures : Arrays.asList(1, 2, 3, 4, 8, 128)) {
       assertThrowsExactly(
           RssException.class,
-          () -> RssShuffleManagerBase.getTaskAttemptId(0, maxFailures, maxFailures, false, 10),
+          () -> getTaskAttemptIdForBlockId(0, maxFailures, maxFailures, false, 10),
           String.valueOf(maxFailures));
       assertThrowsExactly(
           RssException.class,
-          () -> RssShuffleManagerBase.getTaskAttemptId(0, maxFailures + 1, maxFailures, false, 10),
+          () -> getTaskAttemptIdForBlockId(0, maxFailures + 1, maxFailures, false, 10),
           String.valueOf(maxFailures));
       assertThrowsExactly(
           RssException.class,
-          () -> RssShuffleManagerBase.getTaskAttemptId(0, maxFailures + 2, maxFailures, false, 10),
+          () -> getTaskAttemptIdForBlockId(0, maxFailures + 2, maxFailures, false, 10),
           String.valueOf(maxFailures));
       Exception e =
           assertThrowsExactly(
               RssException.class,
-              () ->
-                  RssShuffleManagerBase.getTaskAttemptId(
-                      0, maxFailures + 128, maxFailures, false, 10),
+              () -> getTaskAttemptIdForBlockId(0, maxFailures + 128, maxFailures, false, 10),
               String.valueOf(maxFailures));
       assertEquals(
           "Observing attempt number "
@@ -166,14 +161,14 @@
     // test with mapIndex that would require more than maxTaskAttemptBits
     Exception e =
         assertThrowsExactly(
-            RssException.class, () -> RssShuffleManagerBase.getTaskAttemptId(256, 0, 3, true, 10));
+            RssException.class, () -> getTaskAttemptIdForBlockId(256, 0, 3, true, 10));
     assertEquals(
         "Observing mapIndex[256] that would produce a taskAttemptId with 11 bits "
             + "which is larger than the allowed 10 bits (maxFailures[3], speculation[true]). "
             + "Please consider providing more bits for taskAttemptIds.",
         e.getMessage());
     // check that a lower mapIndex works as expected
-    assertEquals(bits("11111111|00"), RssShuffleManagerBase.getTaskAttemptId(255, 0, 3, true, 10));
+    assertEquals(bits("11111111|00"), getTaskAttemptIdForBlockId(255, 0, 3, true, 10));
   }
 
   @Test
@@ -188,89 +183,85 @@
       for (int attemptNo : Arrays.asList(0, 1)) {
         assertEquals(
             bits("0000|" + attemptNo),
-            RssShuffleManagerBase.getTaskAttemptId(0, attemptNo, maxFailures, true, 10),
+            getTaskAttemptIdForBlockId(0, attemptNo, maxFailures, true, 10),
             "maxFailures=" + maxFailures + ", attemptNo=" + attemptNo);
         assertEquals(
             bits("0001|" + attemptNo),
-            RssShuffleManagerBase.getTaskAttemptId(1, attemptNo, maxFailures, true, 10),
+            getTaskAttemptIdForBlockId(1, attemptNo, maxFailures, true, 10),
             "maxFailures=" + maxFailures + ", attemptNo=" + attemptNo);
         assertEquals(
             bits("0010|" + attemptNo),
-            RssShuffleManagerBase.getTaskAttemptId(2, attemptNo, maxFailures, true, 10),
+            getTaskAttemptIdForBlockId(2, attemptNo, maxFailures, true, 10),
             "maxFailures=" + maxFailures + ", attemptNo=" + attemptNo);
       }
     }
 
     // maxFailures of 2
-    assertEquals(bits("00|00"), RssShuffleManagerBase.getTaskAttemptId(0, 0, 2, true, 10));
-    assertEquals(bits("00|01"), RssShuffleManagerBase.getTaskAttemptId(0, 1, 2, true, 10));
-    assertEquals(bits("00|10"), RssShuffleManagerBase.getTaskAttemptId(0, 2, 2, true, 10));
-    assertEquals(bits("01|00"), RssShuffleManagerBase.getTaskAttemptId(1, 0, 2, true, 10));
-    assertEquals(bits("01|01"), RssShuffleManagerBase.getTaskAttemptId(1, 1, 2, true, 10));
-    assertEquals(bits("01|10"), RssShuffleManagerBase.getTaskAttemptId(1, 2, 2, true, 10));
-    assertEquals(bits("10|00"), RssShuffleManagerBase.getTaskAttemptId(2, 0, 2, true, 10));
-    assertEquals(bits("10|01"), RssShuffleManagerBase.getTaskAttemptId(2, 1, 2, true, 10));
-    assertEquals(bits("10|10"), RssShuffleManagerBase.getTaskAttemptId(2, 2, 2, true, 10));
-    assertEquals(bits("11|00"), RssShuffleManagerBase.getTaskAttemptId(3, 0, 2, true, 10));
-    assertEquals(bits("11|01"), RssShuffleManagerBase.getTaskAttemptId(3, 1, 2, true, 10));
-    assertEquals(bits("11|10"), RssShuffleManagerBase.getTaskAttemptId(3, 2, 2, true, 10));
+    assertEquals(bits("00|00"), getTaskAttemptIdForBlockId(0, 0, 2, true, 10));
+    assertEquals(bits("00|01"), getTaskAttemptIdForBlockId(0, 1, 2, true, 10));
+    assertEquals(bits("00|10"), getTaskAttemptIdForBlockId(0, 2, 2, true, 10));
+    assertEquals(bits("01|00"), getTaskAttemptIdForBlockId(1, 0, 2, true, 10));
+    assertEquals(bits("01|01"), getTaskAttemptIdForBlockId(1, 1, 2, true, 10));
+    assertEquals(bits("01|10"), getTaskAttemptIdForBlockId(1, 2, 2, true, 10));
+    assertEquals(bits("10|00"), getTaskAttemptIdForBlockId(2, 0, 2, true, 10));
+    assertEquals(bits("10|01"), getTaskAttemptIdForBlockId(2, 1, 2, true, 10));
+    assertEquals(bits("10|10"), getTaskAttemptIdForBlockId(2, 2, 2, true, 10));
+    assertEquals(bits("11|00"), getTaskAttemptIdForBlockId(3, 0, 2, true, 10));
+    assertEquals(bits("11|01"), getTaskAttemptIdForBlockId(3, 1, 2, true, 10));
+    assertEquals(bits("11|10"), getTaskAttemptIdForBlockId(3, 2, 2, true, 10));
 
     // maxFailures of 3
-    assertEquals(bits("00|00"), RssShuffleManagerBase.getTaskAttemptId(0, 0, 3, true, 10));
-    assertEquals(bits("00|01"), RssShuffleManagerBase.getTaskAttemptId(0, 1, 3, true, 10));
-    assertEquals(bits("00|10"), RssShuffleManagerBase.getTaskAttemptId(0, 2, 3, true, 10));
-    assertEquals(bits("00|11"), RssShuffleManagerBase.getTaskAttemptId(0, 3, 3, true, 10));
-    assertEquals(bits("01|00"), RssShuffleManagerBase.getTaskAttemptId(1, 0, 3, true, 10));
-    assertEquals(bits("01|01"), RssShuffleManagerBase.getTaskAttemptId(1, 1, 3, true, 10));
-    assertEquals(bits("01|10"), RssShuffleManagerBase.getTaskAttemptId(1, 2, 3, true, 10));
-    assertEquals(bits("01|11"), RssShuffleManagerBase.getTaskAttemptId(1, 3, 3, true, 10));
-    assertEquals(bits("10|00"), RssShuffleManagerBase.getTaskAttemptId(2, 0, 3, true, 10));
-    assertEquals(bits("10|01"), RssShuffleManagerBase.getTaskAttemptId(2, 1, 3, true, 10));
-    assertEquals(bits("10|10"), RssShuffleManagerBase.getTaskAttemptId(2, 2, 3, true, 10));
-    assertEquals(bits("10|11"), RssShuffleManagerBase.getTaskAttemptId(2, 3, 3, true, 10));
-    assertEquals(bits("11|00"), RssShuffleManagerBase.getTaskAttemptId(3, 0, 3, true, 10));
-    assertEquals(bits("11|01"), RssShuffleManagerBase.getTaskAttemptId(3, 1, 3, true, 10));
-    assertEquals(bits("11|10"), RssShuffleManagerBase.getTaskAttemptId(3, 2, 3, true, 10));
-    assertEquals(bits("11|11"), RssShuffleManagerBase.getTaskAttemptId(3, 3, 3, true, 10));
+    assertEquals(bits("00|00"), getTaskAttemptIdForBlockId(0, 0, 3, true, 10));
+    assertEquals(bits("00|01"), getTaskAttemptIdForBlockId(0, 1, 3, true, 10));
+    assertEquals(bits("00|10"), getTaskAttemptIdForBlockId(0, 2, 3, true, 10));
+    assertEquals(bits("00|11"), getTaskAttemptIdForBlockId(0, 3, 3, true, 10));
+    assertEquals(bits("01|00"), getTaskAttemptIdForBlockId(1, 0, 3, true, 10));
+    assertEquals(bits("01|01"), getTaskAttemptIdForBlockId(1, 1, 3, true, 10));
+    assertEquals(bits("01|10"), getTaskAttemptIdForBlockId(1, 2, 3, true, 10));
+    assertEquals(bits("01|11"), getTaskAttemptIdForBlockId(1, 3, 3, true, 10));
+    assertEquals(bits("10|00"), getTaskAttemptIdForBlockId(2, 0, 3, true, 10));
+    assertEquals(bits("10|01"), getTaskAttemptIdForBlockId(2, 1, 3, true, 10));
+    assertEquals(bits("10|10"), getTaskAttemptIdForBlockId(2, 2, 3, true, 10));
+    assertEquals(bits("10|11"), getTaskAttemptIdForBlockId(2, 3, 3, true, 10));
+    assertEquals(bits("11|00"), getTaskAttemptIdForBlockId(3, 0, 3, true, 10));
+    assertEquals(bits("11|01"), getTaskAttemptIdForBlockId(3, 1, 3, true, 10));
+    assertEquals(bits("11|10"), getTaskAttemptIdForBlockId(3, 2, 3, true, 10));
+    assertEquals(bits("11|11"), getTaskAttemptIdForBlockId(3, 3, 3, true, 10));
 
     // maxFailures of 4
-    assertEquals(bits("0|000"), RssShuffleManagerBase.getTaskAttemptId(0, 0, 4, true, 10));
-    assertEquals(bits("1|100"), RssShuffleManagerBase.getTaskAttemptId(1, 4, 4, true, 10));
+    assertEquals(bits("0|000"), getTaskAttemptIdForBlockId(0, 0, 4, true, 10));
+    assertEquals(bits("1|100"), getTaskAttemptIdForBlockId(1, 4, 4, true, 10));
 
     // test with ints that overflow into signed int and long
     assertEquals(
         (long) Integer.MAX_VALUE << 1,
-        RssShuffleManagerBase.getTaskAttemptId(Integer.MAX_VALUE, 0, 1, true, 32));
+        getTaskAttemptIdForBlockId(Integer.MAX_VALUE, 0, 1, true, 32));
     assertEquals(
         (long) Integer.MAX_VALUE << 1 | 1,
-        RssShuffleManagerBase.getTaskAttemptId(Integer.MAX_VALUE, 1, 1, true, 32));
+        getTaskAttemptIdForBlockId(Integer.MAX_VALUE, 1, 1, true, 32));
     assertEquals(
         (long) Integer.MAX_VALUE << 2 | 3,
-        RssShuffleManagerBase.getTaskAttemptId(Integer.MAX_VALUE, 3, 3, true, 33));
+        getTaskAttemptIdForBlockId(Integer.MAX_VALUE, 3, 3, true, 33));
     assertEquals(
         (long) Integer.MAX_VALUE << 3 | 7,
-        RssShuffleManagerBase.getTaskAttemptId(Integer.MAX_VALUE, 7, 7, true, 34));
+        getTaskAttemptIdForBlockId(Integer.MAX_VALUE, 7, 7, true, 34));
 
     // test with attemptNo > maxFailures (attemptNo == maxFailures allowed for speculation enabled)
-    assertThrowsExactly(
-        RssException.class, () -> RssShuffleManagerBase.getTaskAttemptId(0, 2, -1, true, 10));
-    assertThrowsExactly(
-        RssException.class, () -> RssShuffleManagerBase.getTaskAttemptId(0, 2, 0, true, 10));
+    assertThrowsExactly(RssException.class, () -> getTaskAttemptIdForBlockId(0, 2, -1, true, 10));
+    assertThrowsExactly(RssException.class, () -> getTaskAttemptIdForBlockId(0, 2, 0, true, 10));
     for (int maxFailures : Arrays.asList(1, 2, 3, 4, 8, 128)) {
       assertThrowsExactly(
           RssException.class,
-          () -> RssShuffleManagerBase.getTaskAttemptId(0, maxFailures + 1, maxFailures, true, 10),
+          () -> getTaskAttemptIdForBlockId(0, maxFailures + 1, maxFailures, true, 10),
           String.valueOf(maxFailures));
       assertThrowsExactly(
           RssException.class,
-          () -> RssShuffleManagerBase.getTaskAttemptId(0, maxFailures + 2, maxFailures, true, 10),
+          () -> getTaskAttemptIdForBlockId(0, maxFailures + 2, maxFailures, true, 10),
           String.valueOf(maxFailures));
       Exception e =
           assertThrowsExactly(
               RssException.class,
-              () ->
-                  RssShuffleManagerBase.getTaskAttemptId(
-                      0, maxFailures + 128, maxFailures, true, 10),
+              () -> getTaskAttemptIdForBlockId(0, maxFailures + 128, maxFailures, true, 10),
               String.valueOf(maxFailures));
       assertEquals(
           "Observing attempt number "
@@ -284,13 +275,13 @@
     // test with mapIndex that would require more than maxTaskAttemptBits
     Exception e =
         assertThrowsExactly(
-            RssException.class, () -> RssShuffleManagerBase.getTaskAttemptId(256, 0, 4, false, 10));
+            RssException.class, () -> getTaskAttemptIdForBlockId(256, 0, 4, false, 10));
     assertEquals(
         "Observing mapIndex[256] that would produce a taskAttemptId with 11 bits "
             + "which is larger than the allowed 10 bits (maxFailures[4], speculation[false]). "
             + "Please consider providing more bits for taskAttemptIds.",
         e.getMessage());
     // check that a lower mapIndex works as expected
-    assertEquals(bits("11111111|00"), RssShuffleManagerBase.getTaskAttemptId(255, 0, 4, false, 10));
+    assertEquals(bits("11111111|00"), getTaskAttemptIdForBlockId(255, 0, 4, false, 10));
   }
 }
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 4483cdd..3cb78dd 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -73,7 +73,7 @@
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.exception.RssFetchFailedException;
 import org.apache.uniffle.common.rpc.GrpcServer;
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.common.util.RssUtils;
@@ -109,6 +109,7 @@
   private boolean dynamicConfEnabled = false;
   private final int maxFailures;
   private final boolean speculation;
+  private final BlockIdLayout blockIdLayout;
   private final String user;
   private final String uuid;
   private DataPusher dataPusher;
@@ -145,6 +146,7 @@
     this.sparkConf = sparkConf;
     this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
     this.speculation = sparkConf.getBoolean("spark.speculation", false);
+    this.blockIdLayout = BlockIdLayout.from(RssSparkConfig.toRssConf(sparkConf));
     this.user = sparkConf.get("spark.rss.quota.user", "user");
     this.uuid = sparkConf.get("spark.rss.quota.uuid", Long.toString(System.currentTimeMillis()));
     // set & check replica config
@@ -263,6 +265,16 @@
   public <K, V, C> ShuffleHandle registerShuffle(
       int shuffleId, int numMaps, ShuffleDependency<K, V, C> dependency) {
 
+    // fail fast if number of partitions is not supported by block id layout
+    if (dependency.partitioner().numPartitions() > blockIdLayout.maxNumPartitions) {
+      throw new RssException(
+          "Cannot register shuffle with "
+              + dependency.partitioner().numPartitions()
+              + " partitions because the configured block id layout supports at most "
+              + blockIdLayout.maxNumPartitions
+              + " partitions.");
+    }
+
     // Spark have three kinds of serializer:
     // org.apache.spark.serializer.JavaSerializer
     // org.apache.spark.sql.execution.UnsafeRowSerializer
@@ -466,18 +478,11 @@
                 shuffleId, rssHandle.getPartitionToServers(), rssHandle.getRemoteStorage());
       }
       ShuffleWriteMetrics writeMetrics = context.taskMetrics().shuffleWriteMetrics();
-      long taskAttemptId =
-          getTaskAttemptId(
-              context.partitionId(),
-              context.attemptNumber(),
-              maxFailures,
-              speculation,
-              Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
       return new RssShuffleWriter<>(
           rssHandle.getAppId(),
           shuffleId,
           taskId,
-          taskAttemptId,
+          getTaskAttemptIdForBlockId(context.partitionId(), context.attemptNumber()),
           writeMetrics,
           this,
           sparkConf,
@@ -491,6 +496,12 @@
     }
   }
 
+  @Override
+  public long getTaskAttemptIdForBlockId(int mapIndex, int attemptNo) {
+    return getTaskAttemptIdForBlockId(
+        mapIndex, attemptNo, maxFailures, speculation, blockIdLayout.taskAttemptIdBits);
+  }
+
   // This method is called in Spark executor,
   // getting information from Spark driver via the ShuffleHandle.
   @Override
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index b223cea..59e19d1 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -80,7 +80,7 @@
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.exception.RssFetchFailedException;
 import org.apache.uniffle.common.rpc.GrpcServer;
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.common.util.RssUtils;
@@ -110,6 +110,7 @@
   private boolean heartbeatStarted = false;
   private boolean dynamicConfEnabled = false;
   private final ShuffleDataDistributionType dataDistributionType;
+  private final BlockIdLayout blockIdLayout;
   private final int maxConcurrencyPerPartitionToWrite;
   private final int maxFailures;
   private final boolean speculation;
@@ -183,6 +184,7 @@
     this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
     this.dynamicConfEnabled = sparkConf.get(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED);
     this.dataDistributionType = getDataDistributionType(sparkConf);
+    this.blockIdLayout = BlockIdLayout.from(RssSparkConfig.toRssConf(sparkConf));
     this.maxConcurrencyPerPartitionToWrite =
         RssSparkConfig.toRssConf(sparkConf).get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
     this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
@@ -308,6 +310,7 @@
     this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
     this.dataDistributionType =
         RssSparkConfig.toRssConf(sparkConf).get(RssClientConf.DATA_DISTRIBUTION_TYPE);
+    this.blockIdLayout = BlockIdLayout.from(RssSparkConfig.toRssConf(sparkConf));
     this.maxConcurrencyPerPartitionToWrite =
         RssSparkConfig.toRssConf(sparkConf).get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
     this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
@@ -372,6 +375,16 @@
   public <K, V, C> ShuffleHandle registerShuffle(
       int shuffleId, ShuffleDependency<K, V, C> dependency) {
 
+    // fail fast if number of partitions is not supported by block id layout
+    if (dependency.partitioner().numPartitions() > blockIdLayout.maxNumPartitions) {
+      throw new RssException(
+          "Cannot register shuffle with "
+              + dependency.partitioner().numPartitions()
+              + " partitions because the configured block id layout supports at most "
+              + blockIdLayout.maxNumPartitions
+              + " partitions.");
+    }
+
     // Spark have three kinds of serializer:
     // org.apache.spark.serializer.JavaSerializer
     // org.apache.spark.sql.execution.UnsafeRowSerializer
@@ -507,18 +520,11 @@
     }
     String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber();
     LOG.info("RssHandle appId {} shuffleId {} ", rssHandle.getAppId(), rssHandle.getShuffleId());
-    long taskAttemptId =
-        getTaskAttemptId(
-            context.partitionId(),
-            context.attemptNumber(),
-            maxFailures,
-            speculation,
-            Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
     return new RssShuffleWriter<>(
         rssHandle.getAppId(),
         shuffleId,
         taskId,
-        taskAttemptId,
+        getTaskAttemptIdForBlockId(context.partitionId(), context.attemptNumber()),
         writeMetrics,
         this,
         sparkConf,
@@ -529,6 +535,12 @@
         shuffleHandleInfo);
   }
 
+  @Override
+  public long getTaskAttemptIdForBlockId(int mapIndex, int attemptNo) {
+    return getTaskAttemptIdForBlockId(
+        mapIndex, attemptNo, maxFailures, speculation, blockIdLayout.taskAttemptIdBits);
+  }
+
   public void setPusherAppId(RssShuffleHandle rssShuffleHandle) {
     // todo: this implement is tricky, we should refactor it
     if (id.get() == null) {
@@ -697,7 +709,8 @@
         shuffleRemoteStoragePath,
         readerHadoopConf,
         partitionNum,
-        RssUtils.generatePartitionToBitmap(blockIdBitmap, startPartition, endPartition),
+        RssUtils.generatePartitionToBitmap(
+            blockIdBitmap, startPartition, endPartition, blockIdLayout),
         taskIdBitmap,
         readMetrics,
         RssSparkConfig.toRssConf(sparkConf),
diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTest.java
index 64bd6f9..2157cda 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTest.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTest.java
@@ -17,20 +17,29 @@
 
 package org.apache.spark.shuffle;
 
+import org.apache.spark.Partitioner;
+import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.internal.SQLConf;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import org.apache.uniffle.client.util.RssClientConfig;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.apache.spark.shuffle.RssSparkConfig.RSS_SHUFFLE_MANAGER_GRPC_PORT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class RssShuffleManagerTest extends RssShuffleManagerTestBase {
   private static final String SPARK_ADAPTIVE_EXECUTION_ENABLED_KEY = "spark.sql.adaptive.enabled";
@@ -117,4 +126,48 @@
     // by default, the appId is null
     assertNull(shuffleManager.getAppId());
   }
+
+  @ParameterizedTest
+  @ValueSource(ints = {16, 20, 24})
+  public void testRssShuffleManagerRegisterShuffle(int partitionIdBits) {
+    BlockIdLayout layout =
+        BlockIdLayout.from(
+            63 - partitionIdBits - partitionIdBits - 2, partitionIdBits, partitionIdBits + 2);
+
+    SparkConf conf = new SparkConf();
+    conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+    conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+    conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+    conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
+    conf.set("spark.task.maxFailures", "4");
+
+    conf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key(),
+        String.valueOf(layout.sequenceNoBits));
+    conf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_PARTITION_ID_BITS.key(),
+        String.valueOf(layout.partitionIdBits));
+    conf.set(
+        RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key(),
+        String.valueOf(layout.taskAttemptIdBits));
+
+    // register a shuffle with too many partitions should fail
+    Partitioner mockPartitioner = mock(Partitioner.class);
+    when(mockPartitioner.numPartitions()).thenReturn(layout.maxNumPartitions + 1);
+    ShuffleDependency<String, String, String> mockDependency = mock(ShuffleDependency.class);
+    when(mockDependency.partitioner()).thenReturn(mockPartitioner);
+
+    RssShuffleManager shuffleManager = new RssShuffleManager(conf, true);
+
+    RssException e =
+        assertThrowsExactly(
+            RssException.class, () -> shuffleManager.registerShuffle(0, mockDependency));
+    assertEquals(
+        "Cannot register shuffle with "
+            + (layout.maxNumPartitions + 1)
+            + " partitions because the configured block id layout supports at most "
+            + layout.maxNumPartitions
+            + " partitions.",
+        e.getMessage());
+  }
 }
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
index c1e8643..c9a762f 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
@@ -53,17 +53,17 @@
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.Constants;
 
 public class RssTezUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(RssTezUtils.class);
-
+  private static final BlockIdLayout LAYOUT = BlockIdLayout.DEFAULT;
   private static final int MAX_ATTEMPT_LENGTH = 6;
   private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
   private static final int MAX_SEQUENCE_NO =
-      (1 << (Constants.ATOMIC_INT_MAX_LENGTH - MAX_ATTEMPT_LENGTH)) - 1;
+      (1 << (LAYOUT.sequenceNoBits - MAX_ATTEMPT_LENGTH)) - 1;
 
   public static final String HOST_NAME = "hostname";
 
@@ -164,8 +164,7 @@
         partitionId,
         taskAttemptId,
         nextSeqNo);
-    long attemptId =
-        taskAttemptId >> (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
+    long attemptId = taskAttemptId >> (LAYOUT.partitionIdBits + LAYOUT.taskAttemptIdBits);
     if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
       throw new RssException(
           "Can't support attemptId [" + attemptId + "], the max value should be " + MAX_ATTEMPT_ID);
@@ -177,17 +176,15 @@
 
     int atomicInt = (int) ((nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId);
     long taskId =
-        taskAttemptId
-            - (attemptId
-                << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+        taskAttemptId - (attemptId << (LAYOUT.partitionIdBits + LAYOUT.taskAttemptIdBits));
 
-    return BlockId.getBlockId(atomicInt, partitionId, taskId);
+    return LAYOUT.getBlockId(atomicInt, partitionId, taskId);
   }
 
   public static long getTaskAttemptId(long blockId) {
-    int mapId = BlockId.getTaskAttemptId(blockId);
-    int attemptId = BlockId.getSequenceNo(blockId) & MAX_ATTEMPT_ID;
-    return BlockId.getBlockId(attemptId, 0, mapId);
+    int mapId = LAYOUT.getTaskAttemptId(blockId);
+    int attemptId = LAYOUT.getSequenceNo(blockId) & MAX_ATTEMPT_ID;
+    return LAYOUT.getBlockId(attemptId, 0, mapId);
   }
 
   public static int estimateTaskConcurrency(Configuration jobConf, int mapNum, int reduceNum) {
@@ -281,7 +278,7 @@
 
   public static long convertTaskAttemptIdToLong(TezTaskAttemptID taskAttemptID) {
     int lowBytes = taskAttemptID.getTaskID().getId();
-    if (lowBytes > Constants.MAX_TASK_ATTEMPT_ID) {
+    if (lowBytes > LAYOUT.maxTaskAttemptId) {
       throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed");
     }
     int highBytes = taskAttemptID.getId();
@@ -289,7 +286,7 @@
       throw new RssException(
           "TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " exceed.");
     }
-    long id = BlockId.getBlockId(highBytes, 0, lowBytes);
+    long id = LAYOUT.getBlockId(highBytes, 0, lowBytes);
     LOG.info("ConvertTaskAttemptIdToLong taskAttemptID:{}, id is {}, .", taskAttemptID, id);
     return id;
   }
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index 3892458..c9a7c47 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -53,7 +53,7 @@
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 
@@ -96,6 +96,7 @@
   private final long sendCheckTimeout;
   private final int bitmapSplitNum;
   private final long taskAttemptId;
+  private final BlockIdLayout blockIdLayout;
   private TezTaskAttemptID tezTaskAttemptID;
   private final RssConf rssConf;
   private final int shuffleId;
@@ -136,6 +137,7 @@
     this.maxMemSize = maxMemSize;
     this.appId = appId;
     this.taskAttemptId = taskAttemptId;
+    this.blockIdLayout = BlockIdLayout.from(rssConf);
     this.successBlockIds = successBlockIds;
     this.failedBlockIds = failedBlockIds;
     this.shuffleWriteClient = shuffleWriteClient;
@@ -373,7 +375,7 @@
     compressTime += System.currentTimeMillis() - start;
     final long blockId =
         RssTezUtils.getBlockId(partitionId, taskAttemptId, getNextSeqNo(partitionId));
-    LOG.info("blockId is {}", BlockId.fromLong(blockId));
+    LOG.info("blockId is {}", blockIdLayout.asBlockId(blockId));
     uncompressedDataLen += data.length;
     // add memory to indicate bytes which will be sent to shuffle server
     inSendListBytes.addAndGet(wb.getDataLength());
diff --git a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
index 639521f..c71dbef 100644
--- a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
+++ b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
@@ -33,6 +33,7 @@
 
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.util.StorageType;
 
@@ -90,18 +91,18 @@
 
   @Test
   public void testPartitionIdConvertBlock() {
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
     ApplicationId appId = ApplicationId.newInstance(9999, 72);
     TezDAGID dagId = TezDAGID.getInstance(appId, 1);
     TezVertexID vId = TezVertexID.getInstance(dagId, 35);
     TezTaskID tId = TezTaskID.getInstance(vId, 389);
     TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tId, 2);
     long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId);
-    long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
+    long mask = (1L << layout.partitionIdBits) - 1;
     for (int partitionId = 0; partitionId <= 3000; partitionId++) {
       for (int seqNo = 0; seqNo <= 10; seqNo++) {
         long blockId = RssTezUtils.getBlockId(partitionId, taskAttemptId, seqNo);
-        int newPartitionId =
-            Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
+        int newPartitionId = Math.toIntExact((blockId >> layout.taskAttemptIdBits) & mask);
         assertEquals(partitionId, newPartitionId);
       }
     }
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 2831812..49bb2de 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -25,6 +25,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
@@ -41,7 +42,7 @@
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssFetchFailedException;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.IdHelper;
 import org.apache.uniffle.common.util.RssUtils;
@@ -67,19 +68,27 @@
   private AtomicLong crcCheckTime = new AtomicLong(0);
   private ClientReadHandler clientReadHandler;
   private IdHelper idHelper;
+  private BlockIdLayout blockIdLayout;
 
   public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) {
     // add default value
-    if (builder.getIdHelper() == null) {
-      builder.idHelper(new DefaultIdHelper());
-    }
     if (builder.getShuffleDataDistributionType() == null) {
       builder.shuffleDataDistributionType(ShuffleDataDistributionType.NORMAL);
     }
     if (builder.getHadoopConf() == null) {
       builder.hadoopConf(new Configuration());
     }
-    if (builder.getRssConf() != null) {
+    if (builder.getRssConf() != null
+        &&
+        // if rssConf contains only block id config, consider this as test mode as well
+        !builder
+            .getRssConf()
+            .getKeySet()
+            .equals(
+                Sets.newHashSet(
+                    RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key(),
+                    RssClientConf.BLOCKID_PARTITION_ID_BITS.key(),
+                    RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key()))) {
       final int indexReadLimit = builder.getRssConf().get(RssClientConf.RSS_INDEX_READ_LIMIT);
       final String storageType = builder.getRssConf().get(RssClientConf.RSS_STORAGE_TYPE);
       long readBufferSize =
@@ -101,17 +110,32 @@
       builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE));
     } else {
       // most for test
-      RssConf rssConf = new RssConf();
+      RssConf rssConf = (builder.getRssConf() == null) ? new RssConf() : builder.getRssConf();
       rssConf.set(RssClientConf.RSS_STORAGE_TYPE, builder.getStorageType());
       rssConf.set(RssClientConf.RSS_INDEX_READ_LIMIT, builder.getIndexReadLimit());
       rssConf.set(
           RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE, String.valueOf(builder.getReadBufferSize()));
+      if (!rssConf.contains(RssClientConf.BLOCKID_SEQUENCE_NO_BITS)) {
+        rssConf.setInteger(
+            RssClientConf.BLOCKID_SEQUENCE_NO_BITS, BlockIdLayout.DEFAULT.sequenceNoBits);
+      }
+      if (!rssConf.contains(RssClientConf.BLOCKID_PARTITION_ID_BITS)) {
+        rssConf.setInteger(
+            RssClientConf.BLOCKID_PARTITION_ID_BITS, BlockIdLayout.DEFAULT.partitionIdBits);
+      }
+      if (!rssConf.contains(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS)) {
+        rssConf.setInteger(
+            RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS, BlockIdLayout.DEFAULT.taskAttemptIdBits);
+      }
 
       builder.rssConf(rssConf);
       builder.offHeapEnable(false);
       builder.expectedTaskIdsBitmapFilterEnable(false);
       builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE));
     }
+    if (builder.getIdHelper() == null) {
+      builder.idHelper(new DefaultIdHelper(BlockIdLayout.from(builder.getRssConf())));
+    }
 
     init(builder);
   }
@@ -123,6 +147,7 @@
     this.taskIdBitmap = builder.getTaskIdBitmap();
     this.idHelper = builder.getIdHelper();
     this.shuffleServerInfoList = builder.getShuffleServerInfoList();
+    this.blockIdLayout = BlockIdLayout.from(builder.getRssConf());
 
     CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest();
     request.setStorageType(builder.getStorageType());
@@ -212,13 +237,13 @@
             actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(), bs.getLength());
             crcCheckTime.addAndGet(System.currentTimeMillis() - start);
           } catch (Exception e) {
-            LOG.warn("Can't read data for " + BlockId.toString(bs.getBlockId()), e);
+            LOG.warn("Can't read data for " + blockIdLayout.asBlockId(bs.getBlockId()), e);
           }
 
           if (expectedCrc != actualCrc) {
             String errMsg =
                 "Unexpected crc value for "
-                    + BlockId.toString(bs.getBlockId())
+                    + blockIdLayout.asBlockId(bs.getBlockId())
                     + ", expected:"
                     + expectedCrc
                     + ", actual:"
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 7f40085..129dadc 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -89,6 +89,7 @@
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.exception.RssFetchFailedException;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 
@@ -115,6 +116,7 @@
   private final int unregisterRequestTimeSec;
   private Set<ShuffleServerInfo> defectiveServers;
   private RssConf rssConf;
+  private BlockIdLayout blockIdLayout;
 
   public ShuffleWriteClientImpl(ShuffleClientFactory.WriteClientBuilder builder) {
     // set default value
@@ -147,6 +149,7 @@
       defectiveServers = Sets.newConcurrentHashSet();
     }
     this.rssConf = builder.getRssConf();
+    this.blockIdLayout = BlockIdLayout.from(rssConf);
   }
 
   private boolean sendShuffleDataAsync(
@@ -770,7 +773,7 @@
       int shuffleId,
       int partitionId) {
     RssGetShuffleResultRequest request =
-        new RssGetShuffleResultRequest(appId, shuffleId, partitionId);
+        new RssGetShuffleResultRequest(appId, shuffleId, partitionId, blockIdLayout);
     boolean isSuccessful = false;
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
     int successCnt = 0;
@@ -825,7 +828,8 @@
         }
       }
       RssGetShuffleResultForMultiPartRequest request =
-          new RssGetShuffleResultForMultiPartRequest(appId, shuffleId, requestPartitions);
+          new RssGetShuffleResultForMultiPartRequest(
+              appId, shuffleId, requestPartitions, blockIdLayout);
       try {
         RssGetShuffleResultResponse response =
             getShuffleServerClient(shuffleServerInfo).getShuffleResultForMultiPart(request);
diff --git a/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java b/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java
index 7161c4f..dc7683c 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java
@@ -17,12 +17,18 @@
 
 package org.apache.uniffle.client.util;
 
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.IdHelper;
 
 public class DefaultIdHelper implements IdHelper {
+  private final BlockIdLayout layout;
+
+  public DefaultIdHelper(BlockIdLayout layout) {
+    this.layout = layout;
+  }
+
   @Override
   public long getTaskAttemptId(long blockId) {
-    return BlockId.getTaskAttemptId(blockId);
+    return layout.getTaskAttemptId(blockId);
   }
 }
diff --git a/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java b/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
index 19ff6ac..dd9ef62 100644
--- a/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
@@ -33,7 +33,7 @@
 
 import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.client.util.DefaultIdHelper;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.RssUtils;
 
 import static org.apache.uniffle.client.util.ClientUtils.waitUntilDoneOrFail;
@@ -48,18 +48,19 @@
   @Test
   public void testGenerateTaskIdBitMap() {
     int partitionId = 1;
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
     Roaring64NavigableMap blockIdMap = Roaring64NavigableMap.bitmapOf();
     int taskSize = 10;
     long[] except = new long[taskSize];
     for (int i = 0; i < taskSize; i++) {
       except[i] = i;
       for (int j = 0; j < 100; j++) {
-        long blockId = BlockId.getBlockId(j, partitionId, i);
+        long blockId = layout.getBlockId(j, partitionId, i);
         blockIdMap.addLong(blockId);
       }
     }
     Roaring64NavigableMap taskIdBitMap =
-        RssUtils.generateTaskIdBitMap(blockIdMap, new DefaultIdHelper());
+        RssUtils.generateTaskIdBitMap(blockIdMap, new DefaultIdHelper(layout));
     assertEquals(taskSize, taskIdBitMap.getLongCardinality());
     LongIterator longIterator = taskIdBitMap.getLongIterator();
     for (int i = 0; i < taskSize; i++) {
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index f240b12..4c0679a 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -38,14 +38,17 @@
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.HadoopTestBase;
 import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -93,8 +96,9 @@
     readClient.checkProcessedBlockIds();
     readClient.close();
 
-    blockIdBitmap.addLong(BlockId.getBlockId(0, 0, Constants.MAX_TASK_ATTEMPT_ID - 1));
-    taskIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
+    blockIdBitmap.addLong(layout.getBlockId(0, 0, layout.maxTaskAttemptId - 1));
+    taskIdBitmap.addLong(layout.maxTaskAttemptId - 1);
     readClient =
         baseReadBuilder()
             .partitionId(1)
@@ -347,7 +351,11 @@
         }
         fail(EXPECTED_EXCEPTION_MESSAGE);
       } catch (Exception e) {
-        assertTrue(e.getMessage().startsWith("Unexpected crc value"));
+        assertTrue(
+            e.getMessage()
+                .startsWith(
+                    "Unexpected crc value for blockId[5800000000000 (seq: 44, part: 0, task: 0)]"),
+            e.getMessage());
       }
 
       CompressedShuffleBlock block = readClient2.readShuffleBlockData();
@@ -378,6 +386,7 @@
     HadoopShuffleWriteHandler writeHandler =
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
 
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
@@ -385,9 +394,9 @@
     Roaring64NavigableMap wrongBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
     LongIterator iter = blockIdBitmap.getLongIterator();
     while (iter.hasNext()) {
-      BlockId blockId = BlockId.fromLong(iter.next());
+      BlockId blockId = layout.asBlockId(iter.next());
       wrongBlockIdBitmap.addLong(
-          BlockId.getBlockId(blockId.sequenceNo, blockId.partitionId + 1, blockId.taskAttemptId));
+          layout.getBlockId(blockId.sequenceNo, blockId.partitionId + 1, blockId.taskAttemptId));
     }
 
     ShuffleReadClientImpl readClient =
@@ -508,32 +517,75 @@
 
   @Test
   public void readTest13() throws Exception {
-    String basePath = HDFS_URI + "clientReadTest13";
+    doReadTest13(BlockIdLayout.DEFAULT);
+  }
+
+  @Test
+  public void readTest13b() throws Exception {
+    // This test is identical to readTest13, except that it does not use the default BlockIdLayout
+    // the layout is only used by IdHelper that extracts the task attempt id from the block id
+    // the partition id has to be larger than 0, so that it can leak into the task attempt id
+    // if the default layout is being used
+    BlockIdLayout layout = BlockIdLayout.from(22, 21, 20);
+    assertNotEquals(layout, BlockIdLayout.DEFAULT);
+    doReadTest13(layout);
+  }
+
+  public void doReadTest13(BlockIdLayout layout) throws Exception {
+    String basePath = HDFS_URI + "clientReadTest13-" + layout.hashCode();
     HadoopShuffleWriteHandler writeHandler =
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 3);
-    writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
+    writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap, layout);
     // test case: data generated by speculation task without report result
-    writeTestData(writeHandler, 5, 30, 1, 1, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
+    writeTestData(
+        writeHandler, 5, 30, 1, 1, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf(), layout);
     // test case: data generated by speculation task with report result
-    writeTestData(writeHandler, 5, 30, 1, 2, Maps.newHashMap(), blockIdBitmap);
-    writeTestData(writeHandler, 5, 30, 1, 3, expectedData, blockIdBitmap);
+    writeTestData(writeHandler, 5, 30, 1, 2, Maps.newHashMap(), blockIdBitmap, layout);
+    writeTestData(writeHandler, 5, 30, 1, 3, expectedData, blockIdBitmap, layout);
+
+    // we need to tell the read client about the blockId layout
+    RssConf rssConf = new RssConf();
+    rssConf.setInteger(RssClientConf.BLOCKID_SEQUENCE_NO_BITS, layout.sequenceNoBits);
+    rssConf.setInteger(RssClientConf.BLOCKID_PARTITION_ID_BITS, layout.partitionIdBits);
+    rssConf.setInteger(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS, layout.taskAttemptIdBits);
 
     // unexpected taskAttemptId should be filtered
+    assertEquals(15, blockIdBitmap.getIntCardinality());
     ShuffleReadClientImpl readClient =
         baseReadBuilder()
             .partitionId(1)
             .basePath(basePath)
             .blockIdBitmap(blockIdBitmap)
+            .partitionId(1)
             .taskIdBitmap(taskIdBitmap)
+            .rssConf(rssConf)
             .build();
+    // note that skipped block ids in blockIdBitmap will be removed by `build()`
+    assertEquals(10, blockIdBitmap.getIntCardinality());
     TestUtils.validateResult(readClient, expectedData);
     assertEquals(20, readClient.getProcessedBlockIds().getLongCardinality());
     readClient.checkProcessedBlockIds();
     readClient.close();
+
+    if (!layout.equals(BlockIdLayout.DEFAULT)) {
+      // creating a reader with a wrong block id layout will skip all blocks where task attempt id
+      // is not in taskIdBitmap
+      // the particular layout that created the block ids is incompatible with default layout, so
+      // all block ids will be skipped
+      // note that skipped block ids in blockIdBitmap will be removed by `build()`
+      baseReadBuilder()
+          .basePath(basePath)
+          .blockIdBitmap(blockIdBitmap)
+          .partitionId(1)
+          .taskIdBitmap(taskIdBitmap)
+          .build();
+      // note that skipped block ids in blockIdBitmap will be removed by `build()`
+      assertEquals(0, blockIdBitmap.getIntCardinality());
+    }
   }
 
   @Test
@@ -641,13 +693,14 @@
       int partitionId,
       long taskAttemptId,
       Map<Long, byte[]> expectedData,
-      Roaring64NavigableMap blockIdBitmap)
+      Roaring64NavigableMap blockIdBitmap,
+      BlockIdLayout layout)
       throws Exception {
     List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
     for (int i = 0; i < num; i++) {
       byte[] buf = new byte[length];
       new Random().nextBytes(buf);
-      long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), partitionId, taskAttemptId);
+      long blockId = layout.getBlockId(ATOMIC_INT.getAndIncrement(), partitionId, taskAttemptId);
       blocks.add(
           new ShufflePartitionedBlock(
               length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
@@ -657,6 +710,26 @@
     writeHandler.write(blocks);
   }
 
+  private void writeTestData(
+      HadoopShuffleWriteHandler writeHandler,
+      int num,
+      int length,
+      int partitionId,
+      long taskAttemptId,
+      Map<Long, byte[]> expectedData,
+      Roaring64NavigableMap blockIdBitmap)
+      throws Exception {
+    writeTestData(
+        writeHandler,
+        num,
+        length,
+        partitionId,
+        taskAttemptId,
+        expectedData,
+        blockIdBitmap,
+        BlockIdLayout.DEFAULT);
+  }
+
   private void writeDuplicatedData(
       HadoopShuffleWriteHandler writeHandler,
       int num,
@@ -666,11 +739,12 @@
       Map<Long, byte[]> expectedData,
       Roaring64NavigableMap blockIdBitmap)
       throws Exception {
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
     List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
     for (int i = 0; i < num; i++) {
       byte[] buf = new byte[length];
       new Random().nextBytes(buf);
-      long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), partitionId, taskAttemptId);
+      long blockId = layout.getBlockId(ATOMIC_INT.getAndIncrement(), partitionId, taskAttemptId);
       ShufflePartitionedBlock spb =
           new ShufflePartitionedBlock(
               length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf);
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
index f3008e9..d26e969 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
@@ -19,17 +19,25 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
+import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
 import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
 import org.apache.uniffle.client.response.SendShuffleDataResult;
 import org.apache.uniffle.common.ClientType;
@@ -37,14 +45,20 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.netty.IOMode;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.RssUtils;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ShuffleWriteClientImplTest {
@@ -317,4 +331,54 @@
     client.close();
     assertEquals(IOMode.EPOLL, ioMode);
   }
+
+  public static Stream<Arguments> testBlockIdLayouts() {
+    return Stream.of(
+        Arguments.of(BlockIdLayout.DEFAULT), Arguments.of(BlockIdLayout.from(20, 21, 22)));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testBlockIdLayouts")
+  public void testGetShuffleResult(BlockIdLayout layout) {
+    RssConf rssConf = new RssConf();
+    rssConf.set(RssClientConf.BLOCKID_SEQUENCE_NO_BITS, layout.sequenceNoBits);
+    rssConf.set(RssClientConf.BLOCKID_PARTITION_ID_BITS, layout.partitionIdBits);
+    rssConf.set(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS, layout.taskAttemptIdBits);
+    ShuffleWriteClientImpl shuffleWriteClient =
+        ShuffleClientFactory.newWriteBuilder()
+            .clientType(ClientType.GRPC.name())
+            .retryMax(3)
+            .retryIntervalMax(2000)
+            .heartBeatThreadNum(4)
+            .replica(1)
+            .replicaWrite(1)
+            .replicaRead(1)
+            .replicaSkipEnabled(true)
+            .dataTransferPoolSize(1)
+            .dataCommitPoolSize(1)
+            .unregisterThreadPoolSize(10)
+            .unregisterRequestTimeSec(10)
+            .rssConf(rssConf)
+            .build();
+    ShuffleServerClient mockShuffleServerClient = mock(ShuffleServerClient.class);
+    ShuffleWriteClientImpl spyClient = Mockito.spy(shuffleWriteClient);
+    doReturn(mockShuffleServerClient).when(spyClient).getShuffleServerClient(any());
+    RssGetShuffleResultResponse response;
+    try {
+      Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf(1L, 2L, 5L);
+      response = new RssGetShuffleResultResponse(StatusCode.SUCCESS, RssUtils.serializeBitMap(res));
+    } catch (Exception e) {
+      throw new RssException(e);
+    }
+    when(mockShuffleServerClient.getShuffleResult(any())).thenReturn(response);
+
+    Set<ShuffleServerInfo> shuffleServerInfoSet =
+        Sets.newHashSet(new ShuffleServerInfo("id", "host", 0));
+    Roaring64NavigableMap result =
+        spyClient.getShuffleResult("GRPC", shuffleServerInfoSet, "appId", 1, 2);
+
+    verify(mockShuffleServerClient)
+        .getShuffleResult(argThat(request -> request.getBlockIdLayout().equals(layout)));
+    assertArrayEquals(result.stream().sorted().toArray(), new long[] {1L, 2L, 5L});
+  }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/BufferSegment.java b/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
index ac7faa4..1ab68c5 100644
--- a/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
+++ b/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
@@ -20,7 +20,6 @@
 import java.util.Objects;
 
 import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.BlockId;
 
 public class BufferSegment {
 
@@ -61,9 +60,9 @@
 
   @Override
   public String toString() {
-    return "BufferSegment{"
-        + BlockId.toString(blockId)
-        + ", taskAttemptId["
+    return "BufferSegment{blockId["
+        + blockId
+        + "], taskAttemptId["
         + taskAttemptId
         + "], offset["
         + offset
diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java b/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
index 8b5fdc6..8de75d9 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
@@ -22,7 +22,6 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
-import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ByteBufUtils;
 
 public class ShuffleBlockInfo {
@@ -137,7 +136,7 @@
     sb.append("ShuffleBlockInfo:");
     sb.append("shuffleId[" + shuffleId + "],");
     sb.append("partitionId[" + partitionId + "],");
-    sb.append(BlockId.toString(blockId) + ",");
+    sb.append("blockId[" + blockId + "],");
     sb.append("length[" + length + "],");
     sb.append("uncompressLength[" + uncompressLength + "],");
     sb.append("crc[" + crc + "],");
diff --git a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
index 4772b50..1ce68b6 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
@@ -22,8 +22,6 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
-import org.apache.uniffle.common.util.BlockId;
-
 public class ShufflePartitionedBlock {
 
   private int length;
@@ -125,9 +123,9 @@
 
   @Override
   public String toString() {
-    return "ShufflePartitionedBlock{"
-        + BlockId.toString(blockId)
-        + ", length["
+    return "ShufflePartitionedBlock{blockId["
+        + blockId
+        + "], length["
         + length
         + "], uncompressLength["
         + uncompressLength
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 006c9ab..be269ce 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -21,6 +21,7 @@
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.netty.IOMode;
+import org.apache.uniffle.common.util.BlockIdLayout;
 
 import static org.apache.uniffle.common.compression.Codec.Type.LZ4;
 
@@ -57,6 +58,36 @@
               "The type of partition shuffle data distribution, including normal and local_order. "
                   + "The default value is normal. This config is only valid in Spark3.x");
 
+  public static final ConfigOption<Integer> BLOCKID_SEQUENCE_NO_BITS =
+      ConfigOptions.key("rss.client.blockId.sequenceNoBits")
+          .intType()
+          .defaultValue(BlockIdLayout.DEFAULT.sequenceNoBits)
+          .withDescription(
+              "Block ids contain three fields: the sequence number, the partition id and "
+                  + "the task attempt id. This configures the bits reserved for the sequence "
+                  + "number. Each field can at most have 31 bits, while all fields together "
+                  + "must sum up to 63 bits.");
+
+  public static final ConfigOption<Integer> BLOCKID_PARTITION_ID_BITS =
+      ConfigOptions.key("rss.client.blockId.partitionIdBits")
+          .intType()
+          .defaultValue(BlockIdLayout.DEFAULT.partitionIdBits)
+          .withDescription(
+              "Block ids contain three fields: the sequence number, the partition id and "
+                  + "the task attempt id. This configures the bits reserved for the partition id. "
+                  + "Each field can at most have 31 bits, while all fields together "
+                  + "must sum up to 63 bits.");
+
+  public static final ConfigOption<Integer> BLOCKID_TASK_ATTEMPT_ID_BITS =
+      ConfigOptions.key("rss.client.blockId.taskAttemptIdBits")
+          .intType()
+          .defaultValue(BlockIdLayout.DEFAULT.taskAttemptIdBits)
+          .withDescription(
+              "Block ids contain three fields: the sequence number, the partition id and "
+                  + "the task attempt id. This configures the bits reserved for the task attempt id. "
+                  + "Each field can at most have 31 bits, while all fields together "
+                  + "must sum up to 63 bits.");
+
   public static final ConfigOption<Integer> MAX_CONCURRENCY_PER_PARTITION_TO_WRITE =
       ConfigOptions.key("rss.client.max.concurrency.of.per-partition.write")
           .intType()
diff --git a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
index 898d0b1..04763d8 100644
--- a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
+++ b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
@@ -29,7 +29,6 @@
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.BlockId;
 
 public class FixedSizeSegmentSplitter implements SegmentSplitter {
   private static final Logger LOGGER = LoggerFactory.getLogger(FixedSizeSegmentSplitter.class);
@@ -84,11 +83,11 @@
         if (dataFileLen != -1 && totalLength > dataFileLen) {
           LOGGER.info(
               "Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
-                  + "the real data file length: {}(bytes). Partition id: {}. "
+                  + "the real data file length: {}(bytes). Block id: {}"
                   + "This may happen when the data is flushing, please ignore.",
               totalLength,
               dataFileLen,
-              BlockId.getPartitionId(blockId));
+              blockId);
           break;
         }
 
diff --git a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
index cbf0979..366968c 100644
--- a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
+++ b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
@@ -31,7 +31,6 @@
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.BlockId;
 
 /**
  * {@class LocalOrderSegmentSplitter} will be initialized only when the {@class
@@ -106,11 +105,11 @@
         if (dataFileLen != -1 && totalLen > dataFileLen) {
           LOGGER.info(
               "Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
-                  + "the real data file length: {}(bytes). Partition id: {}. This should not happen. "
+                  + "the real data file length: {}(bytes). Block id: {}. This should not happen. "
                   + "This may happen when the data is flushing, please ignore.",
               totalLen,
               dataFileLen,
-              BlockId.getPartitionId(blockId));
+              blockId);
           break;
         }
 
diff --git a/common/src/main/java/org/apache/uniffle/common/util/BlockId.java b/common/src/main/java/org/apache/uniffle/common/util/BlockId.java
index 7aecaae..36025f6 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/BlockId.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/BlockId.java
@@ -17,19 +17,27 @@
 
 package org.apache.uniffle.common.util;
 
-// BlockId is positive long (63 bits) composed of partitionId, taskAttemptId and AtomicInteger.
-// AtomicInteger is highest 18 bits, max value is 2^18 - 1
-// partitionId is middle 24 bits, max value is 2^24 - 1
-// taskAttemptId is lowest 21 bits, max value is 2^21 - 1
-// Values of partitionId, taskAttemptId and AtomicInteger are always positive.
+import java.util.Objects;
+
+/**
+ * This represents a block id and all its constituents. This is particularly useful for logging and
+ * debugging block ids.
+ *
+ * <p>BlockId is positive long (63 bits) composed of sequenceNo, partitionId and taskAttemptId in
+ * that order from highest to lowest bits. The number of bits is defined by a {@link BlockIdLayout}.
+ * Values of partitionId, taskAttemptId and AtomicInteger are always positive.
+ */
 public class BlockId {
   public final long blockId;
+  public final BlockIdLayout layout;
   public final int sequenceNo;
   public final int partitionId;
   public final int taskAttemptId;
 
-  private BlockId(long blockId, int sequenceNo, int partitionId, int taskAttemptId) {
+  protected BlockId(
+      long blockId, BlockIdLayout layout, int sequenceNo, int partitionId, int taskAttemptId) {
     this.blockId = blockId;
+    this.layout = layout;
     this.sequenceNo = sequenceNo;
     this.partitionId = partitionId;
     this.taskAttemptId = taskAttemptId;
@@ -48,61 +56,20 @@
         + ")]";
   }
 
-  public static String toString(long blockId) {
-    return BlockId.fromLong(blockId).toString();
-  }
-
-  public static BlockId fromLong(long blockId) {
-    int sequenceNo = getSequenceNo(blockId);
-    int partitionId = getPartitionId(blockId);
-    int taskAttemptId = getTaskAttemptId(blockId);
-    return new BlockId(blockId, sequenceNo, partitionId, taskAttemptId);
-  }
-
-  public static BlockId fromIds(int sequenceNo, int partitionId, int taskAttemptId) {
-    long blockId = getBlockId(sequenceNo, partitionId, taskAttemptId);
-    return new BlockId(blockId, sequenceNo, partitionId, taskAttemptId);
-  }
-
-  public static long getBlockId(int sequenceNo, int partitionId, long taskAttemptId) {
-    if (sequenceNo < 0 || sequenceNo > Constants.MAX_SEQUENCE_NO) {
-      throw new IllegalArgumentException(
-          "Can't support sequence["
-              + sequenceNo
-              + "], the max value should be "
-              + Constants.MAX_SEQUENCE_NO);
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
     }
-    if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
-      throw new IllegalArgumentException(
-          "Can't support partitionId["
-              + partitionId
-              + "], the max value should be "
-              + Constants.MAX_PARTITION_ID);
+    if (o == null || getClass() != o.getClass()) {
+      return false;
     }
-    if (taskAttemptId < 0 || taskAttemptId > Constants.MAX_TASK_ATTEMPT_ID) {
-      throw new IllegalArgumentException(
-          "Can't support taskAttemptId["
-              + taskAttemptId
-              + "], the max value should be "
-              + Constants.MAX_TASK_ATTEMPT_ID);
-    }
-
-    return ((long) sequenceNo
-            << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
-        + ((long) partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
-        + taskAttemptId;
+    BlockId blockId1 = (BlockId) o;
+    return blockId == blockId1.blockId && Objects.equals(layout, blockId1.layout);
   }
 
-  public static int getSequenceNo(long blockId) {
-    return (int)
-        (blockId >> (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
-  }
-
-  public static int getPartitionId(long blockId) {
-    return (int) ((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & Constants.MAX_PARTITION_ID);
-  }
-
-  public static int getTaskAttemptId(long blockId) {
-    return (int) (blockId & Constants.MAX_TASK_ATTEMPT_ID);
+  @Override
+  public int hashCode() {
+    return Objects.hash(blockId, layout);
   }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/util/BlockIdLayout.java b/common/src/main/java/org/apache/uniffle/common/util/BlockIdLayout.java
new file mode 100644
index 0000000..33d8cd6
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/BlockIdLayout.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.util.Objects;
+
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
+
+/**
+ * This represents the actual bit layout of {@link BlockId}s.
+ *
+ * <p>BlockId is positive long (63 bits) composed of sequenceNo, partitionId and taskAttemptId in
+ * that order from highest to lowest bits. The number of bits is defined by a {@link BlockIdLayout}.
+ * Values of partitionId, taskAttemptId and AtomicInteger are always positive.
+ */
+public class BlockIdLayout {
+
+  public static final BlockIdLayout DEFAULT = BlockIdLayout.from(18, 24, 21);
+
+  public final int sequenceNoBits;
+  public final int partitionIdBits;
+  public final int taskAttemptIdBits;
+
+  public final int sequenceNoOffset;
+  public final int partitionIdOffset;
+  public final int taskAttemptIdOffset;
+
+  public final long sequenceNoMask;
+  public final long partitionIdMask;
+  public final long taskAttemptIdMask;
+
+  public final int maxSequenceNo;
+  public final int maxPartitionId;
+  public final int maxTaskAttemptId;
+
+  public final int maxNumPartitions;
+
+  private BlockIdLayout(int sequenceNoBits, int partitionIdBits, int taskAttemptIdBits) {
+    // individual lengths must be lager than 0
+    if (sequenceNoBits <= 0 || partitionIdBits <= 0 || taskAttemptIdBits <= 0) {
+      throw new IllegalArgumentException(
+          "Don't support given lengths, individual lengths must be larger than 0: given "
+              + "sequenceNoBits="
+              + sequenceNoBits
+              + ", partitionIdBits="
+              + partitionIdBits
+              + ", taskAttemptIdBits="
+              + taskAttemptIdBits);
+    }
+    // individual lengths must be less than 32
+    if (sequenceNoBits >= 32 || partitionIdBits >= 32 || taskAttemptIdBits >= 32) {
+      throw new IllegalArgumentException(
+          "Don't support given lengths, individual lengths must be less that 32: given "
+              + "sequenceNoBits="
+              + sequenceNoBits
+              + ", partitionIdBits="
+              + partitionIdBits
+              + ", taskAttemptIdBits="
+              + taskAttemptIdBits);
+    }
+    // sum of individual lengths must be 63, otherwise we waste bits and risk overflow
+    if (sequenceNoBits + partitionIdBits + taskAttemptIdBits != 63) {
+      throw new IllegalArgumentException(
+          "Don't support given lengths, sum must be exactly 63: "
+              + sequenceNoBits
+              + " + "
+              + partitionIdBits
+              + " + "
+              + taskAttemptIdBits
+              + " = "
+              + (sequenceNoBits + partitionIdBits + taskAttemptIdBits));
+    }
+
+    this.sequenceNoBits = sequenceNoBits;
+    this.partitionIdBits = partitionIdBits;
+    this.taskAttemptIdBits = taskAttemptIdBits;
+
+    // fields are right-aligned
+    this.sequenceNoOffset = partitionIdBits + taskAttemptIdBits;
+    this.partitionIdOffset = taskAttemptIdBits;
+    this.taskAttemptIdOffset = 0;
+
+    // compute max ids
+    this.maxSequenceNo = (1 << sequenceNoBits) - 1;
+    this.maxPartitionId = (1 << partitionIdBits) - 1;
+    this.maxTaskAttemptId = (1 << taskAttemptIdBits) - 1;
+
+    // compute max nums
+    this.maxNumPartitions = this.maxPartitionId + 1;
+
+    // compute masks to simplify bit logic in BlockId methods
+    this.sequenceNoMask = (long) maxSequenceNo << sequenceNoOffset;
+    this.partitionIdMask = (long) maxPartitionId << partitionIdOffset;
+    this.taskAttemptIdMask = (long) maxTaskAttemptId << taskAttemptIdOffset;
+  }
+
+  @Override
+  public String toString() {
+    return "blockIdLayout["
+        + "seq: "
+        + sequenceNoBits
+        + " bits, part: "
+        + partitionIdBits
+        + " bits, task: "
+        + taskAttemptIdBits
+        + " bits]";
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    BlockIdLayout that = (BlockIdLayout) o;
+    return sequenceNoBits == that.sequenceNoBits
+        && partitionIdBits == that.partitionIdBits
+        && taskAttemptIdBits == that.taskAttemptIdBits;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(sequenceNoBits, partitionIdBits, taskAttemptIdBits);
+  }
+
+  public long getBlockId(int sequenceNo, int partitionId, long taskAttemptId) {
+    if (sequenceNo < 0 || sequenceNo > maxSequenceNo) {
+      throw new IllegalArgumentException(
+          "Don't support sequenceNo[" + sequenceNo + "], the max value should be " + maxSequenceNo);
+    }
+    if (partitionId < 0 || partitionId > maxPartitionId) {
+      throw new IllegalArgumentException(
+          "Don't support partitionId["
+              + partitionId
+              + "], the max value should be "
+              + maxPartitionId);
+    }
+    if (taskAttemptId < 0 || taskAttemptId > maxTaskAttemptId) {
+      throw new IllegalArgumentException(
+          "Don't support taskAttemptId["
+              + taskAttemptId
+              + "], the max value should be "
+              + maxTaskAttemptId);
+    }
+
+    return (long) sequenceNo << sequenceNoOffset
+        | (long) partitionId << partitionIdOffset
+        | taskAttemptId << taskAttemptIdOffset;
+  }
+
+  public int getSequenceNo(long blockId) {
+    return (int) ((blockId & sequenceNoMask) >> sequenceNoOffset);
+  }
+
+  public int getPartitionId(long blockId) {
+    return (int) ((blockId & partitionIdMask) >> partitionIdOffset);
+  }
+
+  public int getTaskAttemptId(long blockId) {
+    return (int) ((blockId & taskAttemptIdMask) >> taskAttemptIdOffset);
+  }
+
+  public BlockId asBlockId(long blockId) {
+    return new BlockId(
+        blockId, this, getSequenceNo(blockId), getPartitionId(blockId), getTaskAttemptId(blockId));
+  }
+
+  public BlockId asBlockId(int sequenceNo, int partitionId, long taskAttemptId) {
+    return new BlockId(
+        getBlockId(sequenceNo, partitionId, taskAttemptId),
+        this,
+        sequenceNo,
+        partitionId,
+        (int) taskAttemptId);
+  }
+
+  public static BlockIdLayout from(RssConf rssConf) {
+    int sequenceBits = rssConf.get(RssClientConf.BLOCKID_SEQUENCE_NO_BITS);
+    int partitionBits = rssConf.get(RssClientConf.BLOCKID_PARTITION_ID_BITS);
+    int attemptBits = rssConf.get(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS);
+    return BlockIdLayout.from(sequenceBits, partitionBits, attemptBits);
+  }
+
+  public static BlockIdLayout from(int sequenceNoBits, int partitionIdBits, int taskAttemptIdBits) {
+    return new BlockIdLayout(sequenceNoBits, partitionIdBits, taskAttemptIdBits);
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index a361eab..8c769f7 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -27,17 +27,6 @@
   public static final String COORDINATOR_TAG = "coordinator";
   public static final String SHUFFLE_DATA_FILE_SUFFIX = ".data";
   public static final String SHUFFLE_INDEX_FILE_SUFFIX = ".index";
-  // BlockId is long and consist of partitionId, taskAttemptId, atomicInt
-  // the length of them are ATOMIC_INT_MAX_LENGTH + PARTITION_ID_MAX_LENGTH +
-  // TASK_ATTEMPT_ID_MAX_LENGTH = 63, each of these lengths must be less than 32
-  // do not access the fields, no need to implement your own bit manipulation logic,
-  // better use methods provided by org.apache.uniffle.common.util.BlockId
-  public static final int PARTITION_ID_MAX_LENGTH = 24;
-  public static final int TASK_ATTEMPT_ID_MAX_LENGTH = 21;
-  public static final int ATOMIC_INT_MAX_LENGTH = 18;
-  public static final int MAX_SEQUENCE_NO = (1 << Constants.ATOMIC_INT_MAX_LENGTH) - 1;
-  public static final int MAX_PARTITION_ID = (1 << Constants.PARTITION_ID_MAX_LENGTH) - 1;
-  public static final int MAX_TASK_ATTEMPT_ID = (1 << Constants.TASK_ATTEMPT_ID_MAX_LENGTH) - 1;
   public static final long INVALID_BLOCK_ID = -1L;
 
   public static final String KEY_SPLIT_CHAR = "/";
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 6d4e230..7026320 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -329,7 +329,10 @@
   }
 
   public static Map<Integer, Roaring64NavigableMap> generatePartitionToBitmap(
-      Roaring64NavigableMap shuffleBitmap, int startPartition, int endPartition) {
+      Roaring64NavigableMap shuffleBitmap,
+      int startPartition,
+      int endPartition,
+      BlockIdLayout layout) {
     Map<Integer, Roaring64NavigableMap> result = Maps.newHashMap();
     for (int partitionId = startPartition; partitionId < endPartition; partitionId++) {
       result.computeIfAbsent(partitionId, key -> Roaring64NavigableMap.bitmapOf());
@@ -337,7 +340,7 @@
     Iterator<Long> it = shuffleBitmap.iterator();
     while (it.hasNext()) {
       Long blockId = it.next();
-      int partitionId = BlockId.getPartitionId(blockId);
+      int partitionId = layout.getPartitionId(blockId);
       if (partitionId >= startPartition && partitionId < endPartition) {
         result.get(partitionId).add(blockId);
       }
diff --git a/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java b/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java
index 49a1c58..ca10485 100644
--- a/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java
@@ -58,7 +58,7 @@
   public void testToString() {
     BufferSegment segment = new BufferSegment(0, 1, 2, 3, 4, 5);
     assertEquals(
-        "BufferSegment{blockId[0 (seq: 0, part: 0, task: 0)], taskAttemptId[5], offset[1], length[2], crc[4], uncompressLength[3]}",
+        "BufferSegment{blockId[0], taskAttemptId[5], offset[1], length[2], crc[4], uncompressLength[3]}",
         segment.toString());
   }
 
diff --git a/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java b/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java
index fdac0e9..71db702 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java
@@ -22,8 +22,6 @@
 
 import org.junit.jupiter.api.Test;
 
-import org.apache.uniffle.common.util.BlockId;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class ShuffleBlockInfoTest {
@@ -39,9 +37,9 @@
             + info.getShuffleId()
             + "],partitionId["
             + info.getPartitionId()
-            + "],"
-            + BlockId.toString(info.getBlockId())
-            + ",length["
+            + "],blockId["
+            + info.getBlockId()
+            + "],length["
             + info.getLength()
             + "],uncompressLength["
             + info.getUncompressLength()
@@ -56,9 +54,9 @@
             + info2.getShuffleId()
             + "],partitionId["
             + info2.getPartitionId()
-            + "],"
-            + BlockId.toString(info2.getBlockId())
-            + ",length["
+            + "],blockId["
+            + info2.getBlockId()
+            + "],length["
             + info2.getLength()
             + "],uncompressLength["
             + info2.getUncompressLength()
diff --git a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
index 7f402e5..3f894ab 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
@@ -23,7 +23,6 @@
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 
-import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ByteBufUtils;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -71,9 +70,9 @@
   public void testToString() {
     ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 2, 3, 4, 5, new byte[6]);
     assertEquals(
-        "ShufflePartitionedBlock{"
-            + BlockId.toString(b1.getBlockId())
-            + ", length["
+        "ShufflePartitionedBlock{blockId["
+            + b1.getBlockId()
+            + "], length["
             + b1.getLength()
             + "], uncompressLength["
             + b1.getUncompressLength()
diff --git a/common/src/test/java/org/apache/uniffle/common/util/BlockIdLayoutTest.java b/common/src/test/java/org/apache/uniffle/common/util/BlockIdLayoutTest.java
new file mode 100644
index 0000000..e62a43e
--- /dev/null
+++ b/common/src/test/java/org/apache/uniffle/common/util/BlockIdLayoutTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class BlockIdLayoutTest {
+  @Test
+  public void testFromLengths() {
+    BlockIdLayout layout = BlockIdLayout.from(20, 21, 22);
+
+    assertEquals(20, layout.sequenceNoBits);
+    assertEquals(21, layout.partitionIdBits);
+    assertEquals(22, layout.taskAttemptIdBits);
+
+    assertEquals(43, layout.sequenceNoOffset);
+    assertEquals(22, layout.partitionIdOffset);
+    assertEquals(0, layout.taskAttemptIdOffset);
+
+    assertEquals(1048575, layout.maxSequenceNo);
+    assertEquals(2097151, layout.maxPartitionId);
+    assertEquals(4194303, layout.maxTaskAttemptId);
+
+    assertEquals(2097152, layout.maxNumPartitions);
+
+    assertEquals(1048575L << (21 + 22), layout.sequenceNoMask);
+    assertEquals(2097151L << 22, layout.partitionIdMask);
+    assertEquals(4194303L, layout.taskAttemptIdMask);
+
+    assertEquals("blockIdLayout[seq: 20 bits, part: 21 bits, task: 22 bits]", layout.toString());
+  }
+
+  @Test
+  public void testFromLengthsErrors() {
+    final Throwable e1 =
+        assertThrows(IllegalArgumentException.class, () -> BlockIdLayout.from(21, 21, 20));
+    assertEquals(
+        "Don't support given lengths, sum must be exactly 63: 21 + 21 + 20 = 62", e1.getMessage());
+    final Throwable e2 =
+        assertThrows(IllegalArgumentException.class, () -> BlockIdLayout.from(21, 21, 22));
+    assertEquals(
+        "Don't support given lengths, sum must be exactly 63: 21 + 21 + 22 = 64", e2.getMessage());
+
+    final Throwable e3 =
+        assertThrows(IllegalArgumentException.class, () -> BlockIdLayout.from(32, 16, 16));
+    assertEquals(
+        "Don't support given lengths, individual lengths must be less that 32: "
+            + "given sequenceNoBits=32, partitionIdBits=16, taskAttemptIdBits=16",
+        e3.getMessage());
+    final Throwable e4 =
+        assertThrows(IllegalArgumentException.class, () -> BlockIdLayout.from(16, 32, 16));
+    assertEquals(
+        "Don't support given lengths, individual lengths must be less that 32: "
+            + "given sequenceNoBits=16, partitionIdBits=32, taskAttemptIdBits=16",
+        e4.getMessage());
+    final Throwable e5 =
+        assertThrows(IllegalArgumentException.class, () -> BlockIdLayout.from(16, 16, 32));
+    assertEquals(
+        "Don't support given lengths, individual lengths must be less that 32: "
+            + "given sequenceNoBits=16, partitionIdBits=16, taskAttemptIdBits=32",
+        e5.getMessage());
+
+    final Throwable e6 =
+        assertThrows(IllegalArgumentException.class, () -> BlockIdLayout.from(22, 21, 21));
+    assertEquals(
+        "Don't support given lengths, sum must be exactly 63: 22 + 21 + 21 = 64", e6.getMessage());
+    final Throwable e7 =
+        assertThrows(IllegalArgumentException.class, () -> BlockIdLayout.from(21, 22, 21));
+    assertEquals(
+        "Don't support given lengths, sum must be exactly 63: 21 + 22 + 21 = 64", e7.getMessage());
+    final Throwable e8 =
+        assertThrows(IllegalArgumentException.class, () -> BlockIdLayout.from(21, 21, 22));
+    assertEquals(
+        "Don't support given lengths, sum must be exactly 63: 21 + 21 + 22 = 64", e8.getMessage());
+  }
+
+  public static Stream<Arguments> testBlockIdLayouts() {
+    return Stream.of(
+        Arguments.of(BlockIdLayout.DEFAULT),
+        Arguments.of(BlockIdLayout.from(21, 21, 21)),
+        Arguments.of(BlockIdLayout.from(12, 24, 27)),
+        Arguments.of(BlockIdLayout.from(1, 31, 31)),
+        Arguments.of(BlockIdLayout.from(31, 1, 31)),
+        Arguments.of(BlockIdLayout.from(31, 31, 1)));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testBlockIdLayouts")
+  public void testLayoutGetBlockId(BlockIdLayout layout) {
+    // max value of blockId
+    assertEquals(
+        (long) layout.maxSequenceNo << layout.sequenceNoOffset
+            | (long) layout.maxPartitionId << layout.partitionIdOffset
+            | (long) layout.maxTaskAttemptId << layout.taskAttemptIdOffset,
+        layout.getBlockId(layout.maxSequenceNo, layout.maxPartitionId, layout.maxTaskAttemptId));
+
+    // min value of blockId
+    assertEquals(0L, layout.getBlockId(0, 0, 0));
+
+    // just a random test
+    if (layout.sequenceNoBits > 7 && layout.partitionIdBits > 6 && layout.taskAttemptIdBits > 7) {
+      long blockId = layout.getBlockId(123, 45, 67);
+      assertEquals(
+          123L << layout.sequenceNoOffset
+              | 45L << layout.partitionIdOffset
+              | 67L << layout.taskAttemptIdOffset,
+          blockId);
+
+      assertEquals(123, layout.getSequenceNo(blockId));
+      assertEquals(45, layout.getPartitionId(blockId));
+      assertEquals(67, layout.getTaskAttemptId(blockId));
+
+      assertEquals(blockId, layout.asBlockId(blockId).blockId);
+      assertEquals(123, layout.asBlockId(blockId).sequenceNo);
+      assertEquals(45, layout.asBlockId(blockId).partitionId);
+      assertEquals(67, layout.asBlockId(blockId).taskAttemptId);
+      assertEquals(layout, layout.asBlockId(blockId).layout);
+    }
+
+    final Throwable e1 =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> layout.getBlockId(layout.maxSequenceNo + 1, 0, 0));
+    assertEquals(
+        "Don't support sequenceNo["
+            + (layout.maxSequenceNo + 1)
+            + "], "
+            + "the max value should be "
+            + layout.maxSequenceNo,
+        e1.getMessage());
+
+    final Throwable e2 =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> layout.getBlockId(0, layout.maxPartitionId + 1, 0));
+    assertEquals(
+        "Don't support partitionId["
+            + (layout.maxPartitionId + 1)
+            + "], "
+            + "the max value should be "
+            + layout.maxPartitionId,
+        e2.getMessage());
+
+    final Throwable e3 =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> layout.getBlockId(0, 0, layout.maxTaskAttemptId + 1));
+    assertEquals(
+        "Don't support taskAttemptId["
+            + (layout.maxTaskAttemptId + 1)
+            + "], "
+            + "the max value should be "
+            + layout.maxTaskAttemptId,
+        e3.getMessage());
+  }
+
+  @Test
+  public void testEquals() {
+    BlockIdLayout layout1 = BlockIdLayout.from(20, 21, 22);
+    BlockIdLayout layout2 = BlockIdLayout.from(20, 21, 22);
+    BlockIdLayout layout3 = BlockIdLayout.from(18, 22, 23);
+
+    assertEquals(layout1, layout1);
+    assertEquals(layout1, layout2);
+    assertNotEquals(layout1, layout3);
+
+    BlockIdLayout layout4 = BlockIdLayout.from(18, 24, 21);
+    assertNotEquals(layout1, layout4);
+    assertEquals(layout4, BlockIdLayout.DEFAULT);
+  }
+}
diff --git a/common/src/test/java/org/apache/uniffle/common/util/BlockIdTest.java b/common/src/test/java/org/apache/uniffle/common/util/BlockIdTest.java
index e42717d..50dc43f 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/BlockIdTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/BlockIdTest.java
@@ -20,47 +20,39 @@
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
 
 public class BlockIdTest {
+  BlockIdLayout layout1 = BlockIdLayout.DEFAULT;
+  BlockId blockId1 = layout1.asBlockId(1, 2, 3);
+  BlockId blockId2 = layout1.asBlockId(15, 30, 63);
+
+  BlockIdLayout layout2 = BlockIdLayout.from(31, 16, 16);
+  BlockId blockId3 = layout2.asBlockId(1, 2, 3);
+  BlockId blockId4 = layout2.asBlockId(15, 30, 63);
+
   @Test
-  public void test() {
-    // max value of blockId
-    assertEquals(854558029292503039L, BlockId.getBlockId(24287, 16777215, 1048575));
-    // just a random test
-    assertEquals(3518437418598500L, BlockId.getBlockId(100, 100, 100));
-    // min value of blockId
-    assertEquals(0L, BlockId.getBlockId(0, 0, 0));
+  public void testToString() {
+    assertEquals("blockId[200000400003 (seq: 1, part: 2, task: 3)]", blockId1.toString());
+    assertEquals("blockId[1e00003c0003f (seq: 15, part: 30, task: 63)]", blockId2.toString());
+    assertEquals("blockId[100020003 (seq: 1, part: 2, task: 3)]", blockId3.toString());
+    assertEquals("blockId[f001e003f (seq: 15, part: 30, task: 63)]", blockId4.toString());
+  }
 
-    BlockId blockId = BlockId.fromIds(100, 100, 100);
-    assertEquals("blockId[c80000c800064 (seq: 100, part: 100, task: 100)]", blockId.toString());
+  @Test
+  public void testEquals() {
+    assertEquals(blockId1, blockId1);
+    assertSame(blockId1, blockId1);
 
-    final Throwable e1 =
-        assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(262144, 0, 0));
-    assertEquals("Can't support sequence[262144], the max value should be 262143", e1.getMessage());
+    assertNotEquals(blockId1, blockId2);
+    assertNotEquals(blockId1, blockId3);
+    assertNotEquals(blockId1, blockId4);
+    assertNotEquals(blockId2, blockId4);
 
-    final Throwable e2 =
-        assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(0, 16777216, 0));
-    assertEquals(
-        "Can't support partitionId[16777216], the max value should be 16777215", e2.getMessage());
-
-    final Throwable e3 =
-        assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(0, 0, 2097152));
-    assertEquals(
-        "Can't support taskAttemptId[2097152], the max value should be 2097151", e3.getMessage());
-
-    final Throwable e4 =
-        assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(-1, 0, 0));
-    assertEquals("Can't support sequence[-1], the max value should be 262143", e4.getMessage());
-
-    final Throwable e5 =
-        assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(0, -1, 0));
-    assertEquals(
-        "Can't support partitionId[-1], the max value should be 16777215", e5.getMessage());
-
-    final Throwable e6 =
-        assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(0, 0, -1));
-    assertEquals(
-        "Can't support taskAttemptId[-1], the max value should be 2097151", e6.getMessage());
+    BlockId blockId4 = layout1.asBlockId(1, 2, 3);
+    assertEquals(blockId1, blockId4);
+    assertNotSame(blockId1, blockId4);
   }
 }
diff --git a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index d8457ae..d9bfddb 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -30,12 +30,16 @@
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
+import java.util.stream.Stream;
 import javax.net.ServerSocketFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.common.ShuffleServerInfo;
@@ -225,26 +229,32 @@
     assertEquals(testStr, extsObjs.get(0).get());
   }
 
-  @Test
-  public void testShuffleBitmapToPartitionBitmap() {
+  public static Stream<Arguments> testBlockIdLayouts() {
+    return Stream.of(
+        Arguments.of(BlockIdLayout.DEFAULT), Arguments.of(BlockIdLayout.from(20, 21, 22)));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testBlockIdLayouts")
+  public void testShuffleBitmapToPartitionBitmap(BlockIdLayout layout) {
     Roaring64NavigableMap partition1Bitmap =
         Roaring64NavigableMap.bitmapOf(
-            BlockId.getBlockId(0, 0, 0),
-            BlockId.getBlockId(1, 0, 0),
-            BlockId.getBlockId(0, 0, 1),
-            BlockId.getBlockId(1, 0, 1));
+            layout.getBlockId(0, 0, 0),
+            layout.getBlockId(1, 0, 0),
+            layout.getBlockId(0, 0, 1),
+            layout.getBlockId(1, 0, 1));
     Roaring64NavigableMap partition2Bitmap =
         Roaring64NavigableMap.bitmapOf(
-            BlockId.getBlockId(0, 1, 0),
-            BlockId.getBlockId(1, 1, 0),
-            BlockId.getBlockId(0, 1, 1),
-            BlockId.getBlockId(1, 1, 1));
+            layout.getBlockId(0, 1, 0),
+            layout.getBlockId(1, 1, 0),
+            layout.getBlockId(0, 1, 1),
+            layout.getBlockId(1, 1, 1));
     Roaring64NavigableMap shuffleBitmap = Roaring64NavigableMap.bitmapOf();
     shuffleBitmap.or(partition1Bitmap);
     shuffleBitmap.or(partition2Bitmap);
     assertEquals(8, shuffleBitmap.getLongCardinality());
     Map<Integer, Roaring64NavigableMap> toPartitionBitmap =
-        RssUtils.generatePartitionToBitmap(shuffleBitmap, 0, 2);
+        RssUtils.generatePartitionToBitmap(shuffleBitmap, 0, 2, layout);
     assertEquals(2, toPartitionBitmap.size());
     assertEquals(partition1Bitmap, toPartitionBitmap.get(0));
     assertEquals(partition2Bitmap, toPartitionBitmap.get(1));
diff --git a/docs/client_guide/spark_client_guide.md b/docs/client_guide/spark_client_guide.md
index 528da39..779d92e 100644
--- a/docs/client_guide/spark_client_guide.md
+++ b/docs/client_guide/spark_client_guide.md
@@ -88,6 +88,35 @@
 | spark.rss.client.remote.storage.useLocalConfAsDefault | false   | This option is only valid when the remote storage path is specified. If ture, the remote storage conf will use the client side hadoop configuration loaded from the classpath |
 | spark.rss.hadoop.*                                    | -       | The prefix key for Hadoop conf. For Spark like that: `spark.rss.hadoop.fs.defaultFS=hdfs://rbf-x1`, this will be as `fs.defaultFS=hdfs://rbf-x1` for Hadoop storage           |
 
+### Block id bits
+
+If you observe an error like
+
+    Don't support sequenceNo[…], the max value should be …
+    Don't support partitionId[…], the max value should be …
+    Don't support taskAttemptId[…], the max value should be …
+    Observing attempt number … while maxFailures is set to ….
+    Observing mapIndex[…] that would produce a taskAttemptId with … bits which is larger than the allowed … bits (maxFailures[…], speculation[…]). Please consider providing more bits for taskAttemptIds.
+    Cannot register shuffle with … partitions because the configured block id layout supports at most … partitions.
+
+you should consider increasing the bits reserved in the blockId for that number / id (while decreasing the other number of bits).
+
+The bits reserved for sequence number, partition id and task attempt id are best specified for Spark clients as follows:
+
+1. Reserve the bits required to support the largest number of partitions that you anticipate. Pick `ceil( log(max number of partitions) / log(2) )` bits.
+   For instance, `20` bits support `1,048,576` partitions.
+2. The number of bits for the task attempt ids should be `partitionIdBits + ceil( log(max attempts) / log(2))`,
+   where `max attempts` is set via Spark conf `spark.task.maxFailures` (default is `4`). In the presence of
+   speculative execution enabled via Spark conf `spark.speculation` (default is false), that `max attempts` has to be incremented by one.
+   For example: `22` bits is sufficient for `taskAttemptIdBits` with `partitionIdBits=20`, and Spark conf `spark.task.maxFailures=4` and `spark.speculation=false`.
+3. Reserve the remaining bits to `sequenceNoBits`: `sequenceNoBits = 63 - partitionIdBits - taskAttemptIdBits`.
+
+| Property Name                       | Default | Description                                                                                                                                                         |
+|-------------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| spark.rss.blockId.sequenceNoBits    | 18      | Number of bits reserved in the blockId for the sequence number (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`. |
+| spark.rss.blockId.partitionIdBits   | 24      | Number of bits reserved in the blockId for the partition id (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`.    |
+| spark.rss.blockId.taskAttemptIdBits | 21      | Number of bits reserved in the blockId for the task attempt id (`[1..31]`). Note that `sequenceNoBits + partitionIdBits + taskAttemptIdBits` has to sum up to `63`. |
+
 ### Adaptive Remote Shuffle Enabling 
 Currently, this feature only supports Spark. 
 
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
index ab1c655..b53df0d 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
@@ -40,11 +40,11 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.segment.FixedSizeSegmentSplitter;
 import org.apache.uniffle.common.segment.SegmentSplitter;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ChecksumUtils;
 
 public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
-
+  private static BlockIdLayout LAYOUT = BlockIdLayout.DEFAULT;
   private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
   public static List<ShuffleServerInfo> mockSSI =
       Lists.newArrayList(new ShuffleServerInfo("id", "host", 0));
@@ -64,7 +64,7 @@
       new Random().nextBytes(buf);
       int seqno = ATOMIC_INT.getAndIncrement();
 
-      long blockId = BlockId.getBlockId(seqno, 0, taskAttemptId);
+      long blockId = LAYOUT.getBlockId(seqno, 0, taskAttemptId);
       blockIdBitmap.addLong(blockId);
       dataMap.put(blockId, buf);
       shuffleBlockInfoList.add(
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 98da32c..ea598e4 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -74,8 +74,8 @@
 import org.apache.uniffle.common.metrics.TestUtils;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
-import org.apache.uniffle.common.util.BlockId;
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.proto.RssProtos;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
@@ -99,6 +99,7 @@
   private static ShuffleServerConf grpcShuffleServerConfig;
   private static ShuffleServerConf nettyShuffleServerConfig;
   private final AtomicInteger atomicInteger = new AtomicInteger(0);
+  private static final BlockIdLayout layout = BlockIdLayout.DEFAULT;
   private static final Long EVENT_THRESHOLD_SIZE = 2048L;
   private static final int GB = 1024 * 1024 * 1024;
   protected static final long FAILED_REQUIRE_ID = -1;
@@ -267,7 +268,8 @@
       assertTrue(e.getMessage().contains("error happened when report shuffle result"));
     }
 
-    RssGetShuffleResultRequest req = new RssGetShuffleResultRequest("shuffleResultTest", 1, 1);
+    RssGetShuffleResultRequest req =
+        new RssGetShuffleResultRequest("shuffleResultTest", 1, 1, layout);
     try {
       grpcShuffleServerClient.getShuffleResult(req);
       fail("Exception should be thrown");
@@ -280,7 +282,7 @@
             "shuffleResultTest", 100, Lists.newArrayList(new PartitionRange(0, 1)), "");
     grpcShuffleServerClient.registerShuffle(rrsr);
 
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 1);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 1, layout);
     RssGetShuffleResultResponse result = grpcShuffleServerClient.getShuffleResult(req);
     Roaring64NavigableMap blockIdBitmap = result.getBlockIdBitmap();
     assertEquals(Roaring64NavigableMap.bitmapOf(), blockIdBitmap);
@@ -288,21 +290,21 @@
     request = new RssReportShuffleResultRequest("shuffleResultTest", 0, 0L, partitionToBlockIds, 1);
     RssReportShuffleResultResponse response = grpcShuffleServerClient.reportShuffleResult(request);
     assertEquals(StatusCode.SUCCESS, response.getStatusCode());
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 1);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 1, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
     Roaring64NavigableMap expectedP1 = Roaring64NavigableMap.bitmapOf();
     addExpectedBlockIds(expectedP1, blockIds1);
     assertEquals(expectedP1, blockIdBitmap);
 
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 2);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 2, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
     Roaring64NavigableMap expectedP2 = Roaring64NavigableMap.bitmapOf();
     addExpectedBlockIds(expectedP2, blockIds2);
     assertEquals(expectedP2, blockIdBitmap);
 
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 3);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 3, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
     Roaring64NavigableMap expectedP3 = Roaring64NavigableMap.bitmapOf();
@@ -320,19 +322,19 @@
     request = new RssReportShuffleResultRequest("shuffleResultTest", 0, 1L, partitionToBlockIds, 1);
     grpcShuffleServerClient.reportShuffleResult(request);
 
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 1);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 1, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
     addExpectedBlockIds(expectedP1, blockIds1);
     assertEquals(expectedP1, blockIdBitmap);
 
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 2);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 2, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
     addExpectedBlockIds(expectedP2, blockIds2);
     assertEquals(expectedP2, blockIdBitmap);
 
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 3);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 3, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
     addExpectedBlockIds(expectedP3, blockIds3);
@@ -340,7 +342,7 @@
 
     request = new RssReportShuffleResultRequest("shuffleResultTest", 1, 1L, Maps.newHashMap(), 1);
     grpcShuffleServerClient.reportShuffleResult(request);
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 1, 1);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 1, 1, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
     assertEquals(Roaring64NavigableMap.bitmapOf(), blockIdBitmap);
@@ -365,21 +367,21 @@
             .get(2);
     assertEquals(3, bitmaps.length);
 
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 1);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 1, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
     expectedP1 = Roaring64NavigableMap.bitmapOf();
     addExpectedBlockIds(expectedP1, blockIds1);
     assertEquals(expectedP1, blockIdBitmap);
 
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 2);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 2, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
     expectedP2 = Roaring64NavigableMap.bitmapOf();
     addExpectedBlockIds(expectedP2, blockIds2);
     assertEquals(expectedP2, blockIdBitmap);
 
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 3);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 3, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
     expectedP3 = Roaring64NavigableMap.bitmapOf();
@@ -387,40 +389,54 @@
     assertEquals(expectedP3, blockIdBitmap);
 
     partitionToBlockIds = Maps.newHashMap();
-    blockIds1 = getBlockIdList(Constants.MAX_PARTITION_ID, 3);
+    blockIds1 = getBlockIdList(layout.maxPartitionId, 3);
     blockIds2 = getBlockIdList(2, 2);
     blockIds3 = getBlockIdList(3, 1);
-    partitionToBlockIds.put(Constants.MAX_PARTITION_ID, blockIds1);
+    partitionToBlockIds.put(layout.maxPartitionId, blockIds1);
     partitionToBlockIds.put(2, blockIds2);
     partitionToBlockIds.put(3, blockIds3);
     // bimapNum = 2
     request = new RssReportShuffleResultRequest("shuffleResultTest", 4, 1L, partitionToBlockIds, 2);
     grpcShuffleServerClient.reportShuffleResult(request);
 
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 4, Constants.MAX_PARTITION_ID);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 4, layout.maxPartitionId, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
     expectedP1 = Roaring64NavigableMap.bitmapOf();
     addExpectedBlockIds(expectedP1, blockIds1);
     assertEquals(expectedP1, blockIdBitmap);
 
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 4, 2);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 4, 2, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
     expectedP2 = Roaring64NavigableMap.bitmapOf();
     addExpectedBlockIds(expectedP2, blockIds2);
     assertEquals(expectedP2, blockIdBitmap);
 
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 4, 3);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 4, 3, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
     expectedP3 = Roaring64NavigableMap.bitmapOf();
     addExpectedBlockIds(expectedP3, blockIds3);
     assertEquals(expectedP3, blockIdBitmap);
 
+    // get same shuffle result without block id layout (legacy clients)
+    RssProtos.GetShuffleResultRequest rpcRequest =
+        RssProtos.GetShuffleResultRequest.newBuilder()
+            .setAppId("shuffleResultTest")
+            .setShuffleId(4)
+            .setPartitionId(3)
+            // deliberately not setting block id layout through .setBlockIdLayout
+            .build();
+    RssProtos.GetShuffleResultResponse rpcResponse =
+        grpcShuffleServerClient.getBlockingStub().getShuffleResult(rpcRequest);
+    assertEquals(RssProtos.StatusCode.SUCCESS, rpcResponse.getStatus());
+    blockIdBitmap = RssUtils.deserializeBitMap(rpcResponse.getSerializedBitmap().toByteArray());
+    assertEquals(expectedP3, blockIdBitmap);
+
     // wait resources are deleted
     Thread.sleep(12000);
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 1, 1);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 1, 1, layout);
     try {
       grpcShuffleServerClient.getShuffleResult(req);
       fail("Exception should be thrown");
@@ -434,10 +450,10 @@
     grpcShuffleServerClient.registerShuffle(
         new RssRegisterShuffleRequest(
             "registerTest", 0, Lists.newArrayList(new PartitionRange(0, 1)), ""));
-    RssGetShuffleResultRequest req = new RssGetShuffleResultRequest("registerTest", 0, 0);
+    RssGetShuffleResultRequest req = new RssGetShuffleResultRequest("registerTest", 0, 0, layout);
     // no exception with getShuffleResult means register successfully
     grpcShuffleServerClient.getShuffleResult(req);
-    req = new RssGetShuffleResultRequest("registerTest", 0, 1);
+    req = new RssGetShuffleResultRequest("registerTest", 0, 1, layout);
     grpcShuffleServerClient.getShuffleResult(req);
     grpcShuffleServerClient.registerShuffle(
         new RssRegisterShuffleRequest(
@@ -446,11 +462,11 @@
             Lists.newArrayList(
                 new PartitionRange(0, 0), new PartitionRange(1, 1), new PartitionRange(2, 2)),
             ""));
-    req = new RssGetShuffleResultRequest("registerTest", 1, 0);
+    req = new RssGetShuffleResultRequest("registerTest", 1, 0, layout);
     grpcShuffleServerClient.getShuffleResult(req);
-    req = new RssGetShuffleResultRequest("registerTest", 1, 1);
+    req = new RssGetShuffleResultRequest("registerTest", 1, 1, layout);
     grpcShuffleServerClient.getShuffleResult(req);
-    req = new RssGetShuffleResultRequest("registerTest", 1, 2);
+    req = new RssGetShuffleResultRequest("registerTest", 1, 2, layout);
     grpcShuffleServerClient.getShuffleResult(req);
     // registerShuffle with remote storage
     String appId1 = "remote_storage_register_app1";
@@ -668,8 +684,14 @@
     }
   }
 
-  @Test
-  public void multipleShuffleResultTest() throws Exception {
+  public static Stream<Arguments> testBlockIdLayouts() {
+    return Stream.of(
+        Arguments.of(BlockIdLayout.DEFAULT), Arguments.of(BlockIdLayout.from(20, 21, 22)));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testBlockIdLayouts")
+  public void multipleShuffleResultTest(BlockIdLayout layout) throws Exception {
     Set<Long> expectedBlockIds = Sets.newConcurrentHashSet();
     RssRegisterShuffleRequest rrsr =
         new RssRegisterShuffleRequest(
@@ -681,7 +703,7 @@
           for (int i = 0; i < 100; i++) {
             Map<Integer, List<Long>> ptbs = Maps.newHashMap();
             List<Long> blockIds = Lists.newArrayList();
-            Long blockId = BlockId.getBlockId(i, 1, 0);
+            Long blockId = layout.getBlockId(i, 1, 0);
             expectedBlockIds.add(blockId);
             blockIds.add(blockId);
             ptbs.put(1, blockIds);
@@ -695,7 +717,7 @@
           for (int i = 100; i < 200; i++) {
             Map<Integer, List<Long>> ptbs = Maps.newHashMap();
             List<Long> blockIds = Lists.newArrayList();
-            Long blockId = BlockId.getBlockId(i, 1, 1);
+            Long blockId = layout.getBlockId(i, 1, 1);
             expectedBlockIds.add(blockId);
             blockIds.add(blockId);
             ptbs.put(1, blockIds);
@@ -709,7 +731,7 @@
           for (int i = 200; i < 300; i++) {
             Map<Integer, List<Long>> ptbs = Maps.newHashMap();
             List<Long> blockIds = Lists.newArrayList();
-            Long blockId = BlockId.getBlockId(i, 1, 2);
+            Long blockId = layout.getBlockId(i, 1, 2);
             expectedBlockIds.add(blockId);
             blockIds.add(blockId);
             ptbs.put(1, blockIds);
@@ -734,7 +756,7 @@
     }
 
     RssGetShuffleResultRequest req =
-        new RssGetShuffleResultRequest("multipleShuffleResultTest", 1, 1);
+        new RssGetShuffleResultRequest("multipleShuffleResultTest", 1, 1, layout);
     RssGetShuffleResultResponse result = grpcShuffleServerClient.getShuffleResult(req);
     Roaring64NavigableMap actualBlockIdBitmap = result.getBlockIdBitmap();
     assertEquals(blockIdBitmap, actualBlockIdBitmap);
@@ -957,7 +979,8 @@
             .getCounterMap()
             .get(ShuffleServerGrpcMetrics.GET_SHUFFLE_RESULT_METHOD)
             .get();
-    grpcShuffleServerClient.getShuffleResult(new RssGetShuffleResultRequest(appId, shuffleId, 1));
+    grpcShuffleServerClient.getShuffleResult(
+        new RssGetShuffleResultRequest(appId, shuffleId, 1, layout));
     newValue =
         grpcShuffleServers
             .get(0)
@@ -1057,7 +1080,7 @@
   private List<Long> getBlockIdList(int partitionId, int blockNum) {
     List<Long> blockIds = Lists.newArrayList();
     for (int i = 0; i < blockNum; i++) {
-      blockIds.add(BlockId.getBlockId(atomicInteger.getAndIncrement(), partitionId, 0));
+      blockIds.add(layout.getBlockId(atomicInteger.getAndIncrement(), partitionId, 0));
     }
     return blockIds;
   }
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
index 1d3ffc3..2803f05 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
@@ -58,6 +58,7 @@
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.segment.LocalOrderSegmentSplitter;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServerConf;
@@ -314,8 +315,9 @@
             .filter(x -> expectedBlockIds4.contains(x.getKey()))
             .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
     taskIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
     for (long blockId : expectedBlockIds4) {
-      taskIds.add(new DefaultIdHelper().getTaskAttemptId(blockId));
+      taskIds.add(new DefaultIdHelper(layout).getTaskAttemptId(blockId));
     }
     sdr =
         readShuffleData(
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 3a71596..c905968 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -45,7 +45,7 @@
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -187,6 +187,7 @@
   @Test
   public void reportBlocksToShuffleServerIfNecessary() {
     String testAppId = "reportBlocksToShuffleServerIfNecessary_appId";
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
 
     shuffleWriteClientImpl.registerShuffle(
         shuffleServerInfo1,
@@ -211,7 +212,7 @@
     Set<Long> blockIds = Sets.newHashSet();
     int partitionIdx = 1;
     for (int i = 0; i < 5; i++) {
-      blockIds.add(BlockId.getBlockId(i, partitionIdx, 0));
+      blockIds.add(layout.getBlockId(i, partitionIdx, 0));
     }
     partitionToBlocks.put(partitionIdx, blockIds);
     serverToPartitionToBlockIds.put(shuffleServerInfo1, partitionToBlocks);
@@ -234,6 +235,7 @@
   @Test
   public void reportMultipleServerTest() throws Exception {
     String testAppId = "reportMultipleServerTest";
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
 
     shuffleWriteClientImpl.registerShuffle(
         shuffleServerInfo1,
@@ -259,7 +261,7 @@
     Map<Integer, Set<Long>> partitionToBlocks1 = Maps.newHashMap();
     Set<Long> blockIds = Sets.newHashSet();
     for (int i = 0; i < 5; i++) {
-      blockIds.add(BlockId.getBlockId(i, 1, 0));
+      blockIds.add(layout.getBlockId(i, 1, 0));
     }
     partitionToBlocks1.put(1, blockIds);
     Map<ShuffleServerInfo, Map<Integer, Set<Long>>> serverToPartitionToBlockIds = Maps.newHashMap();
@@ -268,7 +270,7 @@
     Map<Integer, Set<Long>> partitionToBlocks2 = Maps.newHashMap();
     blockIds = Sets.newHashSet();
     for (int i = 0; i < 7; i++) {
-      blockIds.add(BlockId.getBlockId(i, 2, 0));
+      blockIds.add(layout.getBlockId(i, 2, 0));
     }
     partitionToBlocks2.put(2, blockIds);
     serverToPartitionToBlockIds.put(shuffleServerInfo2, partitionToBlocks2);
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
index 30aed50..6d797a8 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
@@ -18,8 +18,13 @@
 package org.apache.uniffle.test;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import scala.Option;
 
@@ -28,15 +33,30 @@
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkEnv;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.shuffle.RssShuffleManager;
 import org.apache.spark.shuffle.RssSparkConfig;
+import org.apache.spark.shuffle.ShuffleHandleInfo;
 import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
+import org.apache.uniffle.client.api.ShuffleWriteClient;
+import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.rpc.ServerType;
+import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -45,13 +65,32 @@
 public class RssShuffleManagerTest extends SparkIntegrationTestBase {
 
   @BeforeAll
-  public static void setupServers() throws Exception {
+  public static void beforeAll() throws Exception {
     shutdownServers();
-    CoordinatorConf coordinatorConf = getCoordinatorConf();
+  }
+
+  @AfterEach
+  public void after() throws Exception {
+    shutdownServers();
+  }
+
+  public static void startServers(BlockIdLayout dynamicConfLayout) throws Exception {
     Map<String, String> dynamicConf = Maps.newHashMap();
     dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test");
-    dynamicConf.put(
-        RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
+    dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name());
+    // configure block id layout (if set)
+    if (dynamicConfLayout != null) {
+      dynamicConf.put(
+          RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key(),
+          String.valueOf(dynamicConfLayout.sequenceNoBits));
+      dynamicConf.put(
+          RssClientConf.BLOCKID_PARTITION_ID_BITS.key(),
+          String.valueOf(dynamicConfLayout.partitionIdBits));
+      dynamicConf.put(
+          RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key(),
+          String.valueOf(dynamicConfLayout.taskAttemptIdBits));
+    }
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
     addDynamicConf(coordinatorConf, dynamicConf);
     createCoordinatorServer(coordinatorConf);
     createShuffleServer(getShuffleServerConf(ServerType.GRPC));
@@ -64,15 +103,73 @@
     return new HashMap();
   }
 
-  @Test
-  public void testRssShuffleManager() throws Exception {
+  private static final BlockIdLayout CUSTOM1 = BlockIdLayout.from(20, 21, 22);
+  private static final BlockIdLayout CUSTOM2 = BlockIdLayout.from(22, 18, 23);
+
+  public static Stream<Arguments> testBlockIdLayouts() {
+    return Stream.of(Arguments.of(CUSTOM1), Arguments.of(CUSTOM2));
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testRssShuffleManager(boolean enableDynamicClientConf) throws Exception {
+    doTestRssShuffleManager(null, null, BlockIdLayout.DEFAULT, enableDynamicClientConf);
+  }
+
+  @ParameterizedTest
+  @MethodSource("testBlockIdLayouts")
+  public void testRssShuffleManagerClientConf(BlockIdLayout layout) throws Exception {
+    doTestRssShuffleManager(layout, null, layout, true);
+  }
+
+  @ParameterizedTest
+  @MethodSource("testBlockIdLayouts")
+  @Disabled(
+      "Dynamic client conf not working for arguments used to create ShuffleWriteClient: issue #1554")
+  public void testRssShuffleManagerDynamicClientConf(BlockIdLayout layout) throws Exception {
+    doTestRssShuffleManager(null, layout, layout, true);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testRssShuffleManagerClientConfOverride(boolean enableDynamicClientConf)
+      throws Exception {
+    doTestRssShuffleManager(CUSTOM1, CUSTOM2, CUSTOM1, enableDynamicClientConf);
+  }
+
+  private void doTestRssShuffleManager(
+      BlockIdLayout clientConfLayout,
+      BlockIdLayout dynamicConfLayout,
+      BlockIdLayout expectedLayout,
+      boolean enableDynamicCLientConf)
+      throws Exception {
+    startServers(dynamicConfLayout);
+
     SparkConf conf = createSparkConf();
     updateSparkConfWithRss(conf);
     // enable stage recompute
     conf.set("spark." + RssClientConfig.RSS_RESUBMIT_STAGE, "true");
-    // disable remote storage fetch
-    conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED, false);
+    // enable dynamic client conf
+    conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED, enableDynamicCLientConf);
+    // configure storage type
     conf.set("spark." + RssClientConfig.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name());
+    // restarting the coordinator may cause RssException: There isn't enough shuffle servers
+    // retry quickly (default is 65s interval)
+    conf.set("spark." + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL, "1000");
+    conf.set("spark." + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES, "10");
+
+    // configure block id layout (if set)
+    if (clientConfLayout != null) {
+      conf.set(
+          "spark." + RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key(),
+          String.valueOf(clientConfLayout.sequenceNoBits));
+      conf.set(
+          "spark." + RssClientConf.BLOCKID_PARTITION_ID_BITS.key(),
+          String.valueOf(clientConfLayout.partitionIdBits));
+      conf.set(
+          "spark." + RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key(),
+          String.valueOf(clientConfLayout.taskAttemptIdBits));
+    }
 
     JavaSparkContext sc = null;
     try {
@@ -84,7 +181,50 @@
       // create a rdd that triggers shuffle registration
       long count = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2).groupBy(x -> x).count();
       assertEquals(5, count);
-      RssShuffleManager shuffleManager = (RssShuffleManager) SparkEnv.get().shuffleManager();
+      RssShuffleManagerBase shuffleManager =
+          (RssShuffleManagerBase) SparkEnv.get().shuffleManager();
+
+      // get written block ids (we know there is one shuffle where two task attempts wrote two
+      // partitions)
+      RssConf rssConf = RssSparkConfig.toRssConf(conf);
+      ShuffleWriteClient shuffleWriteClient =
+          ShuffleClientFactory.newWriteBuilder()
+              .clientType(ClientType.GRPC.name())
+              .retryMax(3)
+              .retryIntervalMax(2000)
+              .heartBeatThreadNum(4)
+              .replica(1)
+              .replicaWrite(1)
+              .replicaRead(1)
+              .replicaSkipEnabled(true)
+              .dataTransferPoolSize(1)
+              .dataCommitPoolSize(1)
+              .unregisterThreadPoolSize(10)
+              .unregisterRequestTimeSec(10)
+              .rssConf(rssConf)
+              .build();
+      ShuffleHandleInfo handle = shuffleManager.getShuffleHandleInfoByShuffleId(0);
+      Set<ShuffleServerInfo> servers =
+          handle.getPartitionToServers().values().stream()
+              .flatMap(Collection::stream)
+              .collect(Collectors.toSet());
+
+      for (int partitionId : new int[] {0, 1}) {
+        Roaring64NavigableMap blockIdLongs =
+            shuffleWriteClient.getShuffleResult(
+                ClientType.GRPC.name(), servers, shuffleManager.getAppId(), 0, partitionId);
+        List<BlockId> blockIds =
+            blockIdLongs.stream()
+                .sorted()
+                .mapToObj(expectedLayout::asBlockId)
+                .collect(Collectors.toList());
+        assertEquals(2, blockIds.size());
+        long taskAttemptId0 = shuffleManager.getTaskAttemptIdForBlockId(0, 0);
+        long taskAttemptId1 = shuffleManager.getTaskAttemptIdForBlockId(1, 0);
+        assertEquals(expectedLayout.asBlockId(0, partitionId, taskAttemptId0), blockIds.get(0));
+        assertEquals(expectedLayout.asBlockId(0, partitionId, taskAttemptId1), blockIds.get(1));
+      }
+
       shuffleManager.unregisterAllMapOutput(0);
       MapOutputTrackerMaster master = (MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
       assertTrue(master.containsShuffle(0));
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index ae23e3e..c134ced 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -51,6 +51,7 @@
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServerConf;
@@ -157,12 +158,13 @@
   @MethodSource("isNettyModeProvider")
   public void readTest1(boolean isNettyMode) {
     String testAppId = "localReadTest1";
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
     registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     createTestData(testAppId, expectedData, blockIdBitmap, taskIdBitmap, isNettyMode);
-    blockIdBitmap.addLong(BlockId.getBlockId(0, 1, 0));
+    blockIdBitmap.addLong(layout.getBlockId(0, 1, 0));
     ShuffleReadClientImpl readClient;
     readClient =
         baseReadBuilder(isNettyMode)
@@ -311,6 +313,7 @@
   @MethodSource("isNettyModeProvider")
   public void readTest6(boolean isNettyMode) {
     String testAppId = "localReadTest6";
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
     registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
@@ -323,9 +326,9 @@
     Roaring64NavigableMap wrongBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
     LongIterator iter = blockIdBitmap.getLongIterator();
     while (iter.hasNext()) {
-      BlockId blockId = BlockId.fromLong(iter.next());
+      BlockId blockId = layout.asBlockId(iter.next());
       wrongBlockIdBitmap.addLong(
-          BlockId.getBlockId(blockId.sequenceNo, blockId.partitionId + 1, blockId.taskAttemptId));
+          layout.getBlockId(blockId.sequenceNo, blockId.partitionId + 1, blockId.taskAttemptId));
     }
 
     ShuffleReadClientImpl readClient =
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 95f092f..5a6919e 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -719,6 +719,12 @@
             .setAppId(request.getAppId())
             .setShuffleId(request.getShuffleId())
             .setPartitionId(request.getPartitionId())
+            .setBlockIdLayout(
+                RssProtos.BlockIdLayout.newBuilder()
+                    .setSequenceNoBits(request.getBlockIdLayout().sequenceNoBits)
+                    .setPartitionIdBits(request.getBlockIdLayout().partitionIdBits)
+                    .setTaskAttemptIdBits(request.getBlockIdLayout().taskAttemptIdBits)
+                    .build())
             .build();
     GetShuffleResultResponse rpcResponse = getBlockingStub().getShuffleResult(rpcRequest);
     RssProtos.StatusCode statusCode = rpcResponse.getStatus();
@@ -761,6 +767,12 @@
             .setAppId(request.getAppId())
             .setShuffleId(request.getShuffleId())
             .addAllPartitions(request.getPartitions())
+            .setBlockIdLayout(
+                RssProtos.BlockIdLayout.newBuilder()
+                    .setSequenceNoBits(request.getBlockIdLayout().sequenceNoBits)
+                    .setPartitionIdBits(request.getBlockIdLayout().partitionIdBits)
+                    .setTaskAttemptIdBits(request.getBlockIdLayout().taskAttemptIdBits)
+                    .build())
             .build();
     GetShuffleResultForMultiPartResponse rpcResponse =
         getBlockingStub().getShuffleResultForMultiPart(rpcRequest);
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleResultForMultiPartRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleResultForMultiPartRequest.java
index e051f84..ec8f460 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleResultForMultiPartRequest.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleResultForMultiPartRequest.java
@@ -19,16 +19,20 @@
 
 import java.util.Set;
 
+import org.apache.uniffle.common.util.BlockIdLayout;
+
 public class RssGetShuffleResultForMultiPartRequest {
   private String appId;
   private int shuffleId;
   private Set<Integer> partitions;
+  private BlockIdLayout blockIdLayout;
 
   public RssGetShuffleResultForMultiPartRequest(
-      String appId, int shuffleId, Set<Integer> partitions) {
+      String appId, int shuffleId, Set<Integer> partitions, BlockIdLayout blockIdLayout) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitions = partitions;
+    this.blockIdLayout = blockIdLayout;
   }
 
   public String getAppId() {
@@ -42,4 +46,8 @@
   public Set<Integer> getPartitions() {
     return partitions;
   }
+
+  public BlockIdLayout getBlockIdLayout() {
+    return blockIdLayout;
+  }
 }
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleResultRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleResultRequest.java
index aec3328..0d0796a 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleResultRequest.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetShuffleResultRequest.java
@@ -17,16 +17,21 @@
 
 package org.apache.uniffle.client.request;
 
+import org.apache.uniffle.common.util.BlockIdLayout;
+
 public class RssGetShuffleResultRequest {
 
   private String appId;
   private int shuffleId;
   private int partitionId;
+  private BlockIdLayout layout;
 
-  public RssGetShuffleResultRequest(String appId, int shuffleId, int partitionId) {
+  public RssGetShuffleResultRequest(
+      String appId, int shuffleId, int partitionId, BlockIdLayout layout) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
+    this.layout = layout;
   }
 
   public String getAppId() {
@@ -40,4 +45,8 @@
   public int getPartitionId() {
     return partitionId;
   }
+
+  public BlockIdLayout getBlockIdLayout() {
+    return layout;
+  }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 932c15e..ac9c762 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -143,6 +143,13 @@
   string appId = 1;
   int32 shuffleId = 2;
   int32 partitionId = 3;
+  BlockIdLayout blockIdLayout = 4;
+}
+
+message BlockIdLayout {
+  int32 sequenceNoBits = 1;
+  int32 partitionIdBits = 2;
+  int32 taskAttemptIdBits = 3;
 }
 
 message GetShuffleResultResponse {
@@ -155,6 +162,7 @@
   string appId = 1;
   int32 shuffleId = 2;
   repeated int32 partitions = 3;
+  BlockIdLayout blockIdLayout = 4;
 }
 
 message GetShuffleResultForMultiPartResponse {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index ce7d2d6..038e03b 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -51,6 +51,7 @@
 import org.apache.uniffle.common.exception.NoBufferForHugePartitionException;
 import org.apache.uniffle.common.exception.NoRegisterException;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.proto.RssProtos;
@@ -528,6 +529,15 @@
     String appId = request.getAppId();
     int shuffleId = request.getShuffleId();
     int partitionId = request.getPartitionId();
+    BlockIdLayout blockIdLayout = BlockIdLayout.DEFAULT;
+    // legacy clients might send request without block id layout, we fall back to DEFAULT then
+    if (request.hasBlockIdLayout()) {
+      blockIdLayout =
+          BlockIdLayout.from(
+              request.getBlockIdLayout().getSequenceNoBits(),
+              request.getBlockIdLayout().getPartitionIdBits(),
+              request.getBlockIdLayout().getTaskAttemptIdBits());
+    }
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";
     GetShuffleResultResponse reply;
@@ -540,7 +550,7 @@
       serializedBlockIds =
           shuffleServer
               .getShuffleTaskManager()
-              .getFinishedBlockIds(appId, shuffleId, Sets.newHashSet(partitionId));
+              .getFinishedBlockIds(appId, shuffleId, Sets.newHashSet(partitionId), blockIdLayout);
       if (serializedBlockIds == null) {
         status = StatusCode.INTERNAL_ERROR;
         msg = "Can't get shuffle result for " + requestInfo;
@@ -571,6 +581,15 @@
     String appId = request.getAppId();
     int shuffleId = request.getShuffleId();
     List<Integer> partitionsList = request.getPartitionsList();
+    BlockIdLayout blockIdLayout = BlockIdLayout.DEFAULT;
+    // legacy clients might send request without block id layout, we fall back to DEFAULT then
+    if (request.hasBlockIdLayout()) {
+      blockIdLayout =
+          BlockIdLayout.from(
+              request.getBlockIdLayout().getSequenceNoBits(),
+              request.getBlockIdLayout().getPartitionIdBits(),
+              request.getBlockIdLayout().getTaskAttemptIdBits());
+    }
 
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";
@@ -584,7 +603,8 @@
       serializedBlockIds =
           shuffleServer
               .getShuffleTaskManager()
-              .getFinishedBlockIds(appId, shuffleId, Sets.newHashSet(partitionsList));
+              .getFinishedBlockIds(
+                  appId, shuffleId, Sets.newHashSet(partitionsList), blockIdLayout);
       if (serializedBlockIds == null) {
         status = StatusCode.INTERNAL_ERROR;
         msg = "Can't get shuffle result for " + requestInfo;
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index c40003c..f9ed125 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -62,7 +62,7 @@
 import org.apache.uniffle.common.exception.NoRegisterException;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.rpc.StatusCode;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RssUtils;
@@ -517,7 +517,8 @@
     return requireBuffer("EMPTY", requireSize);
   }
 
-  public byte[] getFinishedBlockIds(String appId, Integer shuffleId, Set<Integer> partitions)
+  public byte[] getFinishedBlockIds(
+      String appId, Integer shuffleId, Set<Integer> partitions, BlockIdLayout blockIdLayout)
       throws IOException {
     refreshAppId(appId);
     for (int partitionId : partitions) {
@@ -525,10 +526,11 @@
           shuffleBufferManager.getShuffleBufferEntry(appId, shuffleId, partitionId);
       if (entry == null) {
         LOG.error(
-            "The empty shuffle buffer, this should not happen. appId: {}, shuffleId: {}, partition: {}",
+            "The empty shuffle buffer, this should not happen. appId: {}, shuffleId: {}, partition: {}, layout: {}",
             appId,
             shuffleId,
-            partitionId);
+            partitionId,
+            blockIdLayout);
         continue;
       }
       Storage storage =
@@ -564,7 +566,7 @@
     for (Map.Entry<Integer, Set<Integer>> entry : bitmapIndexToPartitions.entrySet()) {
       Set<Integer> requestPartitions = entry.getValue();
       Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
-      getBlockIdsByPartitionId(requestPartitions, bitmap, res);
+      getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
     }
     return RssUtils.serializeBitMap(res);
   }
@@ -573,10 +575,11 @@
   protected Roaring64NavigableMap getBlockIdsByPartitionId(
       Set<Integer> requestPartitions,
       Roaring64NavigableMap bitmap,
-      Roaring64NavigableMap resultBitmap) {
+      Roaring64NavigableMap resultBitmap,
+      BlockIdLayout blockIdLayout) {
     bitmap.forEach(
         blockId -> {
-          int partitionId = BlockId.getPartitionId(blockId);
+          int partitionId = blockIdLayout.getPartitionId(blockId);
           if (requestPartitions.contains(partitionId)) {
             resultBitmap.addLong(blockId);
           }
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 54a638b..70d2529 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -54,9 +54,8 @@
 import org.apache.uniffle.common.exception.NoRegisterException;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.rpc.StatusCode;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
@@ -707,6 +706,7 @@
 
   @Test
   public void getBlockIdsByPartitionIdTest() {
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
     ShuffleServerConf conf = new ShuffleServerConf();
     ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, null, null, null);
 
@@ -716,7 +716,7 @@
     for (int taskId = 1; taskId < 10; taskId++) {
       for (int partitionId = 1; partitionId < 10; partitionId++) {
         for (int i = 0; i < 2; i++) {
-          long blockId = BlockId.getBlockId(i, partitionId, taskId);
+          long blockId = layout.getBlockId(i, partitionId, taskId);
           bitmapBlockIds.addLong(blockId);
           if (partitionId == expectedPartitionId) {
             expectedBlockIds.addLong(blockId);
@@ -726,29 +726,33 @@
     }
     Roaring64NavigableMap resultBlockIds =
         shuffleTaskManager.getBlockIdsByPartitionId(
-            Sets.newHashSet(expectedPartitionId), bitmapBlockIds, Roaring64NavigableMap.bitmapOf());
+            Sets.newHashSet(expectedPartitionId),
+            bitmapBlockIds,
+            Roaring64NavigableMap.bitmapOf(),
+            layout);
     assertEquals(expectedBlockIds, resultBlockIds);
 
-    bitmapBlockIds.addLong(BlockId.getBlockId(0, 0, 0));
+    bitmapBlockIds.addLong(layout.getBlockId(0, 0, 0));
     resultBlockIds =
         shuffleTaskManager.getBlockIdsByPartitionId(
-            Sets.newHashSet(0), bitmapBlockIds, Roaring64NavigableMap.bitmapOf());
+            Sets.newHashSet(0), bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout);
     assertEquals(Roaring64NavigableMap.bitmapOf(0L), resultBlockIds);
 
     long expectedBlockId =
-        BlockId.getBlockId(
-            Constants.MAX_SEQUENCE_NO, Constants.MAX_PARTITION_ID, Constants.MAX_TASK_ATTEMPT_ID);
+        layout.getBlockId(layout.maxSequenceNo, layout.maxPartitionId, layout.maxTaskAttemptId);
     bitmapBlockIds.addLong(expectedBlockId);
     resultBlockIds =
         shuffleTaskManager.getBlockIdsByPartitionId(
-            Sets.newHashSet(Constants.MAX_PARTITION_ID),
+            Sets.newHashSet(layout.maxPartitionId),
             bitmapBlockIds,
-            Roaring64NavigableMap.bitmapOf());
+            Roaring64NavigableMap.bitmapOf(),
+            layout);
     assertEquals(Roaring64NavigableMap.bitmapOf(expectedBlockId), resultBlockIds);
   }
 
   @Test
   public void getBlockIdsByMultiPartitionTest() {
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
     ShuffleServerConf conf = new ShuffleServerConf();
     ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, null, null, null);
 
@@ -759,7 +763,7 @@
     for (int taskId = 1; taskId < 10; taskId++) {
       for (int partitionId = 1; partitionId < 10; partitionId++) {
         for (int i = 0; i < 2; i++) {
-          long blockId = BlockId.getBlockId(i, partitionId, taskId);
+          long blockId = layout.getBlockId(i, partitionId, taskId);
           bitmapBlockIds.addLong(blockId);
           if (partitionId >= startPartition && partitionId <= endPartition) {
             expectedBlockIds.addLong(blockId);
@@ -778,12 +782,12 @@
 
     Roaring64NavigableMap resultBlockIds =
         shuffleTaskManager.getBlockIdsByPartitionId(
-            requestPartitions, bitmapBlockIds, Roaring64NavigableMap.bitmapOf());
+            requestPartitions, bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout);
     assertEquals(expectedBlockIds, resultBlockIds);
     assertEquals(
         bitmapBlockIds,
         shuffleTaskManager.getBlockIdsByPartitionId(
-            allPartitions, bitmapBlockIds, Roaring64NavigableMap.bitmapOf()));
+            allPartitions, bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout));
   }
 
   @Test
@@ -819,6 +823,7 @@
 
     int startPartition = 6;
     int endPartition = 9;
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
     Roaring64NavigableMap expectedBlockIds = Roaring64NavigableMap.bitmapOf();
     Map<Integer, long[]> blockIdsToReport = Maps.newHashMap();
 
@@ -832,7 +837,7 @@
       long[] blockIds = new long[taskNum * blocksPerTask];
       for (int taskId = 0; taskId < taskNum; taskId++) {
         for (int i = 0; i < blocksPerTask; i++) {
-          long blockId = BlockId.getBlockId(i, partitionId, taskId);
+          long blockId = layout.getBlockId(i, partitionId, taskId);
           blockIds[taskId * blocksPerTask + i] = blockId;
         }
       }
@@ -851,7 +856,7 @@
       requestPartitions.add(partitionId);
     }
     byte[] serializeBitMap =
-        shuffleTaskManager.getFinishedBlockIds(appId, shuffleId, requestPartitions);
+        shuffleTaskManager.getFinishedBlockIds(appId, shuffleId, requestPartitions, layout);
     Roaring64NavigableMap resBlockIds = RssUtils.deserializeBitMap(serializeBitMap);
     assertEquals(expectedBlockIds, resBlockIds);
 
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java b/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java
index 38f3abb..625c128 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java
@@ -19,8 +19,6 @@
 
 import java.util.Objects;
 
-import org.apache.uniffle.common.util.BlockId;
-
 public class FileBasedShuffleSegment extends ShuffleSegment
     implements Comparable<FileBasedShuffleSegment> {
 
@@ -121,9 +119,9 @@
         + uncompressLength
         + "], crc["
         + crc
-        + "], "
-        + BlockId.toString(blockId)
-        + ", taskAttemptId["
+        + "], blockid["
+        + blockId
+        + "], taskAttemptId["
         + taskAttemptId
         + "]}";
   }
diff --git a/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java b/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
index 7b30493..ea29a47 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
@@ -30,7 +30,7 @@
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
@@ -56,7 +56,8 @@
     for (int i = 0; i < num; i++) {
       byte[] buf = new byte[length];
       new Random().nextBytes(buf);
-      long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0, taskAttemptId);
+      BlockIdLayout layout = BlockIdLayout.DEFAULT;
+      long blockId = layout.getBlockId(ATOMIC_INT.getAndIncrement(), 0, taskAttemptId);
       blocks.add(
           new ShufflePartitionedBlock(
               length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
@@ -76,12 +77,13 @@
       Map<Integer, List<FileBasedShuffleSegment>> expectedIndexSegments,
       boolean doWrite)
       throws Exception {
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
     List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
     List<FileBasedShuffleSegment> segments = Lists.newArrayList();
     for (int i = 0; i < num; i++) {
       byte[] buf = new byte[length];
       new Random().nextBytes(buf);
-      long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0, taskAttemptId);
+      long blockId = layout.getBlockId(ATOMIC_INT.getAndIncrement(), 0, taskAttemptId);
       blocks.add(
           new ShufflePartitionedBlock(
               length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
index 8b307a3..c11fc27 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
@@ -36,7 +36,7 @@
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.storage.HadoopShuffleHandlerTestBase;
 import org.apache.uniffle.storage.HadoopTestBase;
@@ -118,7 +118,8 @@
     List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
     byte[] buf = new byte[blockSize];
     new Random().nextBytes(buf);
-    long blockId = BlockId.getBlockId(expectTotalBlockNum, 0, taskAttemptId);
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
+    long blockId = layout.getBlockId(expectTotalBlockNum, 0, taskAttemptId);
     blocks.add(
         new ShufflePartitionedBlock(
             blockSize, blockSize, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
index 19a2905..729a3a3 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
@@ -33,7 +33,7 @@
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.segment.FixedSizeSegmentSplitter;
-import org.apache.uniffle.common.util.BlockId;
+import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
@@ -47,11 +47,12 @@
   private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
 
   public static List<ShufflePartitionedBlock> generateBlocks(int num, int length) {
+    BlockIdLayout layout = BlockIdLayout.DEFAULT;
     List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
     for (int i = 0; i < num; i++) {
       byte[] buf = new byte[length];
       new Random().nextBytes(buf);
-      long blockId = BlockId.getBlockId(ATOMIC_INT.incrementAndGet(), 0, 100);
+      long blockId = layout.getBlockId(ATOMIC_INT.incrementAndGet(), 0, 100);
       blocks.add(
           new ShufflePartitionedBlock(
               length, length, ChecksumUtils.getCrc32(buf), blockId, 100, buf));