[MINOR] refactor(common): Move blockId bit logic into common class (#1527)

### What changes were proposed in this pull request?
Moves block id bit manipulation logic into one place in `common.util`. Logic is tested once and reused everywhere. Reused in `RssTezUtils` and `RssMRUtils` where reasonable. Aligns the order of arguments that constructs a `BlockId` with the bit-wise order inside the block id (highest to lowest).

### Why are the changes needed?
Reduces code duplication and improves code readability.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit tests.
diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 5be31d3..e92bf07 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -37,32 +37,34 @@
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.Constants;
 
 public class RssMRUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(RssMRUtils.class);
   private static final int MAX_ATTEMPT_LENGTH = 6;
-  private static final long MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+  private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+  private static final int MAX_SEQUENCE_NO =
+      (1 << (Constants.ATOMIC_INT_MAX_LENGTH - MAX_ATTEMPT_LENGTH)) - 1;
 
   // Class TaskAttemptId have two field id and mapId, rss taskAttemptID have 21 bits,
   // mapId is 19 bits, id is 2 bits. MR have a trick logic, taskAttemptId will increase
   // 1000 * (appAttemptId - 1), so we will decrease it.
   public static long convertTaskAttemptIdToLong(TaskAttemptID taskAttemptID, int appAttemptId) {
-    long lowBytes = taskAttemptID.getTaskID().getId();
+    int lowBytes = taskAttemptID.getTaskID().getId();
     if (lowBytes > Constants.MAX_TASK_ATTEMPT_ID) {
       throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed");
     }
     if (appAttemptId < 1) {
       throw new RssException("appAttemptId " + appAttemptId + " is wrong");
     }
-    long highBytes = (long) taskAttemptID.getId() - (appAttemptId - 1) * 1000;
+    int highBytes = taskAttemptID.getId() - (appAttemptId - 1) * 1000;
     if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
       throw new RssException(
           "TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " exceed");
     }
-    return (highBytes << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
-        + lowBytes;
+    return BlockId.getBlockId(highBytes, 0, lowBytes);
   }
 
   public static TaskAttemptID createMRTaskAttemptId(
@@ -70,14 +72,9 @@
     if (appAttemptId < 1) {
       throw new RssException("appAttemptId " + appAttemptId + " is wrong");
     }
-    TaskID taskID =
-        new TaskID(jobID, taskType, (int) (rssTaskAttemptId & Constants.MAX_TASK_ATTEMPT_ID));
-    return new TaskAttemptID(
-        taskID,
-        (int)
-                (rssTaskAttemptId
-                    >> (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
-            + 1000 * (appAttemptId - 1));
+    TaskID taskID = new TaskID(jobID, taskType, BlockId.getTaskAttemptId(rssTaskAttemptId));
+    int id = BlockId.getSequenceNo(rssTaskAttemptId) + 1000 * (appAttemptId - 1);
+    return new TaskAttemptID(taskID, id);
   }
 
   public static ShuffleWriteClient createShuffleClient(JobConf jobConf) {
@@ -229,51 +226,31 @@
     return rssJobConf.get(key, defaultValue);
   }
 
-  public static long getBlockId(long partitionId, long taskAttemptId, int nextSeqNo) {
+  public static long getBlockId(int partitionId, long taskAttemptId, int nextSeqNo) {
     long attemptId =
         taskAttemptId >> (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
     if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
       throw new RssException(
           "Can't support attemptId [" + attemptId + "], the max value should be " + MAX_ATTEMPT_ID);
     }
-    long atomicInt = (nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId;
-    if (atomicInt < 0 || atomicInt > Constants.MAX_SEQUENCE_NO) {
+    if (nextSeqNo < 0 || nextSeqNo > MAX_SEQUENCE_NO) {
       throw new RssException(
-          "Can't support sequence ["
-              + atomicInt
-              + "], the max value should be "
-              + Constants.MAX_SEQUENCE_NO);
+          "Can't support sequence [" + nextSeqNo + "], the max value should be " + MAX_SEQUENCE_NO);
     }
-    if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
-      throw new RssException(
-          "Can't support partitionId["
-              + partitionId
-              + "], the max value should be "
-              + Constants.MAX_PARTITION_ID);
-    }
+
+    int atomicInt = (int) ((nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId);
     long taskId =
         taskAttemptId
             - (attemptId
                 << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
-    if (taskId < 0 || taskId > Constants.MAX_TASK_ATTEMPT_ID) {
-      throw new RssException(
-          "Can't support taskId["
-              + taskId
-              + "], the max value should be "
-              + Constants.MAX_TASK_ATTEMPT_ID);
-    }
-    return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
-        + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
-        + taskId;
+
+    return BlockId.getBlockId(atomicInt, partitionId, taskId);
   }
 
   public static long getTaskAttemptId(long blockId) {
-    long mapId = blockId & Constants.MAX_TASK_ATTEMPT_ID;
-    long attemptId =
-        (blockId >> (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
-            & MAX_ATTEMPT_ID;
-    return (attemptId << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
-        + mapId;
+    int mapId = BlockId.getTaskAttemptId(blockId);
+    int attemptId = BlockId.getSequenceNo(blockId) & MAX_ATTEMPT_ID;
+    return BlockId.getBlockId(attemptId, 0, mapId);
   }
 
   public static int estimateTaskConcurrency(JobConf jobConf) {
diff --git a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
index cb5c2c6..c95951e 100644
--- a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
+++ b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
@@ -87,7 +87,7 @@
     long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
     for (int partitionId = 0; partitionId <= 3000; partitionId++) {
       for (int seqNo = 0; seqNo <= 10; seqNo++) {
-        long blockId = RssMRUtils.getBlockId(Long.valueOf(partitionId), taskAttemptId, seqNo);
+        long blockId = RssMRUtils.getBlockId(partitionId, taskAttemptId, seqNo);
         int newPartitionId =
             Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
         assertEquals(partitionId, newPartitionId);
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 5b9a928..9450c0f 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -46,12 +46,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ChecksumUtils;
 
 public class WriteBufferManager extends MemoryConsumer {
@@ -325,8 +325,7 @@
       compressTime += System.currentTimeMillis() - start;
     }
     final long crc32 = ChecksumUtils.getCrc32(compressed);
-    final long blockId =
-        ClientUtils.getBlockId(partitionId, taskAttemptId, getNextSeqNo(partitionId));
+    final long blockId = BlockId.getBlockId(getNextSeqNo(partitionId), partitionId, taskAttemptId);
     uncompressedDataLen += data.length;
     shuffleWriteMetrics.incBytesWritten(compressed.length);
     // add memory to indicate bytes which will be sent to shuffle server
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
index 0724547..90505d9 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
@@ -34,10 +34,10 @@
 import org.apache.spark.serializer.SerializerInstance;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
-import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.storage.HadoopTestBase;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -106,7 +106,7 @@
         expectedData.put(key, value);
         writeData(serializeStream, key, value);
       }
-      long blockId = ClientUtils.getBlockId(partitionID, 0, atomicInteger.getAndIncrement());
+      long blockId = BlockId.getBlockId(atomicInteger.getAndIncrement(), partitionID, 0);
       blockIdBitmap.add(blockId);
       blocks.add(createShuffleBlock(output.toBytes(), blockId, compress));
       serializeStream.close();
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index 0dea951..2962bc1 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -40,9 +40,9 @@
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
-import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
@@ -81,7 +81,7 @@
 
     validateResult(rssShuffleDataIterator, expectedData, 10);
 
-    blockIdBitmap.add(ClientUtils.getBlockId(0, 0, Constants.MAX_SEQUENCE_NO));
+    blockIdBitmap.add(BlockId.getBlockId(Constants.MAX_SEQUENCE_NO, 0, 0));
     rssShuffleDataIterator =
         getDataIterator(basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1));
     int recNum = 0;
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
index e497995..c1e8643 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
@@ -53,6 +53,7 @@
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.Constants;
 
 public class RssTezUtils {
@@ -60,7 +61,9 @@
   private static final Logger LOG = LoggerFactory.getLogger(RssTezUtils.class);
 
   private static final int MAX_ATTEMPT_LENGTH = 6;
-  private static final long MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+  private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+  private static final int MAX_SEQUENCE_NO =
+      (1 << (Constants.ATOMIC_INT_MAX_LENGTH - MAX_ATTEMPT_LENGTH)) - 1;
 
   public static final String HOST_NAME = "hostname";
 
@@ -155,7 +158,7 @@
     return StringUtils.join(ids, "_", 0, 7);
   }
 
-  public static long getBlockId(long partitionId, long taskAttemptId, int nextSeqNo) {
+  public static long getBlockId(int partitionId, long taskAttemptId, int nextSeqNo) {
     LOG.info(
         "GetBlockId, partitionId:{}, taskAttemptId:{}, nextSeqNo:{}",
         partitionId,
@@ -167,45 +170,24 @@
       throw new RssException(
           "Can't support attemptId [" + attemptId + "], the max value should be " + MAX_ATTEMPT_ID);
     }
-    long atomicInt = (nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId;
-    if (atomicInt < 0 || atomicInt > Constants.MAX_SEQUENCE_NO) {
+    if (nextSeqNo < 0 || nextSeqNo > MAX_SEQUENCE_NO) {
       throw new RssException(
-          "Can't support sequence ["
-              + atomicInt
-              + "], the max value should be "
-              + Constants.MAX_SEQUENCE_NO);
+          "Can't support sequence [" + nextSeqNo + "], the max value should be " + MAX_SEQUENCE_NO);
     }
-    if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
-      throw new RssException(
-          "Can't support partitionId["
-              + partitionId
-              + "], the max value should be "
-              + Constants.MAX_PARTITION_ID);
-    }
+
+    int atomicInt = (int) ((nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId);
     long taskId =
         taskAttemptId
             - (attemptId
                 << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
 
-    if (taskId < 0 || taskId > Constants.MAX_TASK_ATTEMPT_ID) {
-      throw new RssException(
-          "Can't support taskId["
-              + taskId
-              + "], the max value should be "
-              + Constants.MAX_TASK_ATTEMPT_ID);
-    }
-    return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
-        + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
-        + taskId;
+    return BlockId.getBlockId(atomicInt, partitionId, taskId);
   }
 
   public static long getTaskAttemptId(long blockId) {
-    long mapId = blockId & Constants.MAX_TASK_ATTEMPT_ID;
-    long attemptId =
-        (blockId >> (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
-            & MAX_ATTEMPT_ID;
-    return (attemptId << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
-        + mapId;
+    int mapId = BlockId.getTaskAttemptId(blockId);
+    int attemptId = BlockId.getSequenceNo(blockId) & MAX_ATTEMPT_ID;
+    return BlockId.getBlockId(attemptId, 0, mapId);
   }
 
   public static int estimateTaskConcurrency(Configuration jobConf, int mapNum, int reduceNum) {
@@ -298,18 +280,16 @@
   }
 
   public static long convertTaskAttemptIdToLong(TezTaskAttemptID taskAttemptID) {
-    long lowBytes = taskAttemptID.getTaskID().getId();
+    int lowBytes = taskAttemptID.getTaskID().getId();
     if (lowBytes > Constants.MAX_TASK_ATTEMPT_ID) {
       throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed");
     }
-    long highBytes = taskAttemptID.getId();
+    int highBytes = taskAttemptID.getId();
     if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
       throw new RssException(
           "TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " exceed.");
     }
-    long id =
-        (highBytes << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
-            + lowBytes;
+    long id = BlockId.getBlockId(highBytes, 0, lowBytes);
     LOG.info("ConvertTaskAttemptIdToLong taskAttemptID:{}, id is {}, .", taskAttemptID, id);
     return id;
   }
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index ec83a09..3892458 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -53,6 +53,7 @@
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 
@@ -371,8 +372,8 @@
     final long crc32 = ChecksumUtils.getCrc32(compressed);
     compressTime += System.currentTimeMillis() - start;
     final long blockId =
-        RssTezUtils.getBlockId((long) partitionId, taskAttemptId, getNextSeqNo(partitionId));
-    LOG.info("blockId is {}", blockId);
+        RssTezUtils.getBlockId(partitionId, taskAttemptId, getNextSeqNo(partitionId));
+    LOG.info("blockId is {}", BlockId.fromLong(blockId));
     uncompressedDataLen += data.length;
     // add memory to indicate bytes which will be sent to shuffle server
     inSendListBytes.addAndGet(wb.getDataLength());
diff --git a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
index c21173d..639521f 100644
--- a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
+++ b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
@@ -99,7 +99,7 @@
     long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
     for (int partitionId = 0; partitionId <= 3000; partitionId++) {
       for (int seqNo = 0; seqNo <= 10; seqNo++) {
-        long blockId = RssTezUtils.getBlockId(Long.valueOf(partitionId), taskAttemptId, seqNo);
+        long blockId = RssTezUtils.getBlockId(partitionId, taskAttemptId, seqNo);
         int newPartitionId =
             Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
         assertEquals(partitionId, newPartitionId);
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 7e2f930..e1fbb9f 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -41,6 +41,7 @@
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssFetchFailedException;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.IdHelper;
 import org.apache.uniffle.common.util.RssUtils;
@@ -211,14 +212,14 @@
             actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(), bs.getLength());
             crcCheckTime.addAndGet(System.currentTimeMillis() - start);
           } catch (Exception e) {
-            LOG.warn("Can't read data for blockId[" + bs.getBlockId() + "]", e);
+            LOG.warn("Can't read data for " + BlockId.toString(bs.getBlockId()), e);
           }
 
           if (expectedCrc != actualCrc) {
             String errMsg =
-                "Unexpected crc value for blockId["
-                    + bs.getBlockId()
-                    + "], expected:"
+                "Unexpected crc value for "
+                    + BlockId.toString(bs.getBlockId())
+                    + ", expected:"
                     + expectedCrc
                     + ", actual:"
                     + actualCrc;
diff --git a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
index b3330c8..b3d40dc 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
@@ -29,43 +29,10 @@
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.RemoteStorageInfo;
-import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.util.StorageType;
 
 public class ClientUtils {
 
-  // BlockId is positive long (63 bits) composed of partitionId, taskAttemptId and AtomicInteger.
-  // AtomicInteger is highest 18 bits, max value is 2^18 - 1
-  // partitionId is middle 24 bits, max value is 2^24 - 1
-  // taskAttemptId is lowest 21 bits, max value is 2^21 - 1
-  // Values of partitionId, taskAttemptId and AtomicInteger are always positive.
-  public static Long getBlockId(long partitionId, long taskAttemptId, long atomicInt) {
-    if (atomicInt < 0 || atomicInt > Constants.MAX_SEQUENCE_NO) {
-      throw new IllegalArgumentException(
-          "Can't support sequence["
-              + atomicInt
-              + "], the max value should be "
-              + Constants.MAX_SEQUENCE_NO);
-    }
-    if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
-      throw new IllegalArgumentException(
-          "Can't support partitionId["
-              + partitionId
-              + "], the max value should be "
-              + Constants.MAX_PARTITION_ID);
-    }
-    if (taskAttemptId < 0 || taskAttemptId > Constants.MAX_TASK_ATTEMPT_ID) {
-      throw new IllegalArgumentException(
-          "Can't support taskAttemptId["
-              + taskAttemptId
-              + "], the max value should be "
-              + Constants.MAX_TASK_ATTEMPT_ID);
-    }
-    return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
-        + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
-        + taskAttemptId;
-  }
-
   public static RemoteStorageInfo fetchRemoteStorage(
       String appId,
       RemoteStorageInfo defaultRemoteStorage,
diff --git a/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java b/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java
index 97376cc..7161c4f 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java
@@ -17,12 +17,12 @@
 
 package org.apache.uniffle.client.util;
 
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.IdHelper;
 
 public class DefaultIdHelper implements IdHelper {
   @Override
   public long getTaskAttemptId(long blockId) {
-    return blockId & Constants.MAX_TASK_ATTEMPT_ID;
+    return BlockId.getTaskAttemptId(blockId);
   }
 }
diff --git a/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java b/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
index b8e7e7f..19ff6ac 100644
--- a/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
@@ -33,12 +33,11 @@
 
 import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.client.util.DefaultIdHelper;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.RssUtils;
 
 import static org.apache.uniffle.client.util.ClientUtils.waitUntilDoneOrFail;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public class ClientUtilsTest {
@@ -47,33 +46,6 @@
   private ExecutorService executorService = Executors.newFixedThreadPool(10);
 
   @Test
-  public void getBlockIdTest() {
-    // max value of blockId
-    assertEquals(new Long(854558029292503039L), ClientUtils.getBlockId(16777215, 1048575, 24287));
-    // just a random test
-    assertEquals(new Long(3518437418598500L), ClientUtils.getBlockId(100, 100, 100));
-    // min value of blockId
-    assertEquals(new Long(0L), ClientUtils.getBlockId(0, 0, 0));
-
-    final Throwable e1 =
-        assertThrows(IllegalArgumentException.class, () -> ClientUtils.getBlockId(16777216, 0, 0));
-    assertTrue(
-        e1.getMessage()
-            .contains("Can't support partitionId[16777216], the max value should be 16777215"));
-
-    final Throwable e2 =
-        assertThrows(IllegalArgumentException.class, () -> ClientUtils.getBlockId(0, 2097152, 0));
-    assertTrue(
-        e2.getMessage()
-            .contains("Can't support taskAttemptId[2097152], the max value should be 2097151"));
-
-    final Throwable e3 =
-        assertThrows(IllegalArgumentException.class, () -> ClientUtils.getBlockId(0, 0, 262144));
-    assertTrue(
-        e3.getMessage().contains("Can't support sequence[262144], the max value should be 262143"));
-  }
-
-  @Test
   public void testGenerateTaskIdBitMap() {
     int partitionId = 1;
     Roaring64NavigableMap blockIdMap = Roaring64NavigableMap.bitmapOf();
@@ -82,7 +54,7 @@
     for (int i = 0; i < taskSize; i++) {
       except[i] = i;
       for (int j = 0; j < 100; j++) {
-        Long blockId = ClientUtils.getBlockId(partitionId, i, j);
+        long blockId = BlockId.getBlockId(j, partitionId, i);
         blockIdMap.addLong(blockId);
       }
     }
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index 14e9af9..eda8136 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -21,7 +21,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -38,6 +38,7 @@
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.HadoopTestBase;
@@ -53,7 +54,7 @@
 public class ShuffleReadClientImplTest extends HadoopTestBase {
 
   private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should be thrown";
-  private static AtomicLong ATOMIC_LONG = new AtomicLong(0);
+  private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
 
   private ShuffleServerInfo ssi1 = new ShuffleServerInfo("host1-0", "host1", 0);
   private ShuffleServerInfo ssi2 = new ShuffleServerInfo("host2-0", "host2", 0);
@@ -91,7 +92,7 @@
     readClient.checkProcessedBlockIds();
     readClient.close();
 
-    blockIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
+    blockIdBitmap.addLong(BlockId.getBlockId(0, 0, Constants.MAX_TASK_ATTEMPT_ID - 1));
     taskIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
     readClient =
         baseReadBuilder()
@@ -377,7 +378,9 @@
     Roaring64NavigableMap wrongBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
     LongIterator iter = blockIdBitmap.getLongIterator();
     while (iter.hasNext()) {
-      wrongBlockIdBitmap.addLong(iter.next() + (1 << Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+      BlockId blockId = BlockId.fromLong(iter.next());
+      wrongBlockIdBitmap.addLong(
+          BlockId.getBlockId(blockId.sequenceNo, blockId.partitionId + 1, blockId.taskAttemptId));
     }
 
     ShuffleReadClientImpl readClient =
@@ -585,10 +588,7 @@
     for (int i = 0; i < num; i++) {
       byte[] buf = new byte[length];
       new Random().nextBytes(buf);
-      long blockId =
-          (ATOMIC_LONG.getAndIncrement()
-                  << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
-              + taskAttemptId;
+      long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0, taskAttemptId);
       blocks.add(
           new ShufflePartitionedBlock(
               length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
@@ -610,10 +610,7 @@
     for (int i = 0; i < num; i++) {
       byte[] buf = new byte[length];
       new Random().nextBytes(buf);
-      long blockId =
-          (ATOMIC_LONG.getAndIncrement()
-                  << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
-              + taskAttemptId;
+      long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0, taskAttemptId);
       ShufflePartitionedBlock spb =
           new ShufflePartitionedBlock(
               length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf);
diff --git a/common/src/main/java/org/apache/uniffle/common/BufferSegment.java b/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
index 1ab68c5..ac7faa4 100644
--- a/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
+++ b/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
@@ -20,6 +20,7 @@
 import java.util.Objects;
 
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
 
 public class BufferSegment {
 
@@ -60,9 +61,9 @@
 
   @Override
   public String toString() {
-    return "BufferSegment{blockId["
-        + blockId
-        + "], taskAttemptId["
+    return "BufferSegment{"
+        + BlockId.toString(blockId)
+        + ", taskAttemptId["
         + taskAttemptId
         + "], offset["
         + offset
diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java b/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
index 8de75d9..8b5fdc6 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
@@ -22,6 +22,7 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ByteBufUtils;
 
 public class ShuffleBlockInfo {
@@ -136,7 +137,7 @@
     sb.append("ShuffleBlockInfo:");
     sb.append("shuffleId[" + shuffleId + "],");
     sb.append("partitionId[" + partitionId + "],");
-    sb.append("blockId[" + blockId + "],");
+    sb.append(BlockId.toString(blockId) + ",");
     sb.append("length[" + length + "],");
     sb.append("uncompressLength[" + uncompressLength + "],");
     sb.append("crc[" + crc + "],");
diff --git a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
index 1ce68b6..4772b50 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
@@ -22,6 +22,8 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
+import org.apache.uniffle.common.util.BlockId;
+
 public class ShufflePartitionedBlock {
 
   private int length;
@@ -123,9 +125,9 @@
 
   @Override
   public String toString() {
-    return "ShufflePartitionedBlock{blockId["
-        + blockId
-        + "], length["
+    return "ShufflePartitionedBlock{"
+        + BlockId.toString(blockId)
+        + ", length["
         + length
         + "], uncompressLength["
         + uncompressLength
diff --git a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
index 7b3626d..898d0b1 100644
--- a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
+++ b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
@@ -29,7 +29,7 @@
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockId;
 
 public class FixedSizeSegmentSplitter implements SegmentSplitter {
   private static final Logger LOGGER = LoggerFactory.getLogger(FixedSizeSegmentSplitter.class);
@@ -82,14 +82,13 @@
         // than the length in the actual data file, and it needs to be returned at this time to
         // avoid EOFException
         if (dataFileLen != -1 && totalLength > dataFileLen) {
-          long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
           LOGGER.info(
               "Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
                   + "the real data file length: {}(bytes). Partition id: {}. "
                   + "This may happen when the data is flushing, please ignore.",
               totalLength,
               dataFileLen,
-              Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask));
+              BlockId.getPartitionId(blockId));
           break;
         }
 
diff --git a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
index 98b9cca..cbf0979 100644
--- a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
+++ b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
@@ -31,7 +31,7 @@
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockId;
 
 /**
  * {@class LocalOrderSegmentSplitter} will be initialized only when the {@class
@@ -104,14 +104,13 @@
         // than the length in the actual data file, and it needs to be returned at this time to
         // avoid EOFException
         if (dataFileLen != -1 && totalLen > dataFileLen) {
-          long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
           LOGGER.info(
               "Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
                   + "the real data file length: {}(bytes). Partition id: {}. This should not happen. "
                   + "This may happen when the data is flushing, please ignore.",
               totalLen,
               dataFileLen,
-              Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask));
+              BlockId.getPartitionId(blockId));
           break;
         }
 
diff --git a/common/src/main/java/org/apache/uniffle/common/util/BlockId.java b/common/src/main/java/org/apache/uniffle/common/util/BlockId.java
new file mode 100644
index 0000000..7aecaae
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/BlockId.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+// BlockId is positive long (63 bits) composed of partitionId, taskAttemptId and AtomicInteger.
+// AtomicInteger is highest 18 bits, max value is 2^18 - 1
+// partitionId is middle 24 bits, max value is 2^24 - 1
+// taskAttemptId is lowest 21 bits, max value is 2^21 - 1
+// Values of partitionId, taskAttemptId and AtomicInteger are always positive.
+public class BlockId {
+  public final long blockId;
+  public final int sequenceNo;
+  public final int partitionId;
+  public final int taskAttemptId;
+
+  private BlockId(long blockId, int sequenceNo, int partitionId, int taskAttemptId) {
+    this.blockId = blockId;
+    this.sequenceNo = sequenceNo;
+    this.partitionId = partitionId;
+    this.taskAttemptId = taskAttemptId;
+  }
+
+  @Override
+  public String toString() {
+    return "blockId["
+        + Long.toHexString(blockId)
+        + " (seq: "
+        + sequenceNo
+        + ", part: "
+        + partitionId
+        + ", task: "
+        + taskAttemptId
+        + ")]";
+  }
+
+  public static String toString(long blockId) {
+    return BlockId.fromLong(blockId).toString();
+  }
+
+  public static BlockId fromLong(long blockId) {
+    int sequenceNo = getSequenceNo(blockId);
+    int partitionId = getPartitionId(blockId);
+    int taskAttemptId = getTaskAttemptId(blockId);
+    return new BlockId(blockId, sequenceNo, partitionId, taskAttemptId);
+  }
+
+  public static BlockId fromIds(int sequenceNo, int partitionId, int taskAttemptId) {
+    long blockId = getBlockId(sequenceNo, partitionId, taskAttemptId);
+    return new BlockId(blockId, sequenceNo, partitionId, taskAttemptId);
+  }
+
+  public static long getBlockId(int sequenceNo, int partitionId, long taskAttemptId) {
+    if (sequenceNo < 0 || sequenceNo > Constants.MAX_SEQUENCE_NO) {
+      throw new IllegalArgumentException(
+          "Can't support sequence["
+              + sequenceNo
+              + "], the max value should be "
+              + Constants.MAX_SEQUENCE_NO);
+    }
+    if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
+      throw new IllegalArgumentException(
+          "Can't support partitionId["
+              + partitionId
+              + "], the max value should be "
+              + Constants.MAX_PARTITION_ID);
+    }
+    if (taskAttemptId < 0 || taskAttemptId > Constants.MAX_TASK_ATTEMPT_ID) {
+      throw new IllegalArgumentException(
+          "Can't support taskAttemptId["
+              + taskAttemptId
+              + "], the max value should be "
+              + Constants.MAX_TASK_ATTEMPT_ID);
+    }
+
+    return ((long) sequenceNo
+            << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
+        + ((long) partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
+        + taskAttemptId;
+  }
+
+  public static int getSequenceNo(long blockId) {
+    return (int)
+        (blockId >> (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+  }
+
+  public static int getPartitionId(long blockId) {
+    return (int) ((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & Constants.MAX_PARTITION_ID);
+  }
+
+  public static int getTaskAttemptId(long blockId) {
+    return (int) (blockId & Constants.MAX_TASK_ATTEMPT_ID);
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 4b35463..a361eab 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -29,14 +29,17 @@
   public static final String SHUFFLE_INDEX_FILE_SUFFIX = ".index";
   // BlockId is long and consist of partitionId, taskAttemptId, atomicInt
   // the length of them are ATOMIC_INT_MAX_LENGTH + PARTITION_ID_MAX_LENGTH +
-  // TASK_ATTEMPT_ID_MAX_LENGTH = 63
+  // TASK_ATTEMPT_ID_MAX_LENGTH = 63, each of these lengths must be less than 32
+  // do not access the fields, no need to implement your own bit manipulation logic,
+  // better use methods provided by org.apache.uniffle.common.util.BlockId
   public static final int PARTITION_ID_MAX_LENGTH = 24;
   public static final int TASK_ATTEMPT_ID_MAX_LENGTH = 21;
   public static final int ATOMIC_INT_MAX_LENGTH = 18;
-  public static final long MAX_SEQUENCE_NO = (1 << Constants.ATOMIC_INT_MAX_LENGTH) - 1;
-  public static final long MAX_PARTITION_ID = (1 << Constants.PARTITION_ID_MAX_LENGTH) - 1;
-  public static final long MAX_TASK_ATTEMPT_ID = (1 << Constants.TASK_ATTEMPT_ID_MAX_LENGTH) - 1;
+  public static final int MAX_SEQUENCE_NO = (1 << Constants.ATOMIC_INT_MAX_LENGTH) - 1;
+  public static final int MAX_PARTITION_ID = (1 << Constants.PARTITION_ID_MAX_LENGTH) - 1;
+  public static final int MAX_TASK_ATTEMPT_ID = (1 << Constants.TASK_ATTEMPT_ID_MAX_LENGTH) - 1;
   public static final long INVALID_BLOCK_ID = -1L;
+
   public static final String KEY_SPLIT_CHAR = "/";
   public static final String COMMA_SPLIT_CHAR = ",";
   public static final String EQUAL_SPLIT_CHAR = "=";
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index ad2b4c4..6d4e230 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -335,10 +335,9 @@
       result.computeIfAbsent(partitionId, key -> Roaring64NavigableMap.bitmapOf());
     }
     Iterator<Long> it = shuffleBitmap.iterator();
-    long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
     while (it.hasNext()) {
       Long blockId = it.next();
-      int partitionId = Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
+      int partitionId = BlockId.getPartitionId(blockId);
       if (partitionId >= startPartition && partitionId < endPartition) {
         result.get(partitionId).add(blockId);
       }
diff --git a/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java b/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java
index ca10485..49a1c58 100644
--- a/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java
@@ -58,7 +58,7 @@
   public void testToString() {
     BufferSegment segment = new BufferSegment(0, 1, 2, 3, 4, 5);
     assertEquals(
-        "BufferSegment{blockId[0], taskAttemptId[5], offset[1], length[2], crc[4], uncompressLength[3]}",
+        "BufferSegment{blockId[0 (seq: 0, part: 0, task: 0)], taskAttemptId[5], offset[1], length[2], crc[4], uncompressLength[3]}",
         segment.toString());
   }
 
diff --git a/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java b/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java
index 71db702..fdac0e9 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java
@@ -22,6 +22,8 @@
 
 import org.junit.jupiter.api.Test;
 
+import org.apache.uniffle.common.util.BlockId;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class ShuffleBlockInfoTest {
@@ -37,9 +39,9 @@
             + info.getShuffleId()
             + "],partitionId["
             + info.getPartitionId()
-            + "],blockId["
-            + info.getBlockId()
-            + "],length["
+            + "],"
+            + BlockId.toString(info.getBlockId())
+            + ",length["
             + info.getLength()
             + "],uncompressLength["
             + info.getUncompressLength()
@@ -54,9 +56,9 @@
             + info2.getShuffleId()
             + "],partitionId["
             + info2.getPartitionId()
-            + "],blockId["
-            + info2.getBlockId()
-            + "],length["
+            + "],"
+            + BlockId.toString(info2.getBlockId())
+            + ",length["
             + info2.getLength()
             + "],uncompressLength["
             + info2.getUncompressLength()
diff --git a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
index 3f894ab..7f402e5 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
@@ -23,6 +23,7 @@
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ByteBufUtils;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -70,9 +71,9 @@
   public void testToString() {
     ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 2, 3, 4, 5, new byte[6]);
     assertEquals(
-        "ShufflePartitionedBlock{blockId["
-            + b1.getBlockId()
-            + "], length["
+        "ShufflePartitionedBlock{"
+            + BlockId.toString(b1.getBlockId())
+            + ", length["
             + b1.getLength()
             + "], uncompressLength["
             + b1.getUncompressLength()
diff --git a/common/src/test/java/org/apache/uniffle/common/util/BlockIdTest.java b/common/src/test/java/org/apache/uniffle/common/util/BlockIdTest.java
new file mode 100644
index 0000000..e42717d
--- /dev/null
+++ b/common/src/test/java/org/apache/uniffle/common/util/BlockIdTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class BlockIdTest {
+  @Test
+  public void test() {
+    // max value of blockId
+    assertEquals(854558029292503039L, BlockId.getBlockId(24287, 16777215, 1048575));
+    // just a random test
+    assertEquals(3518437418598500L, BlockId.getBlockId(100, 100, 100));
+    // min value of blockId
+    assertEquals(0L, BlockId.getBlockId(0, 0, 0));
+
+    BlockId blockId = BlockId.fromIds(100, 100, 100);
+    assertEquals("blockId[c80000c800064 (seq: 100, part: 100, task: 100)]", blockId.toString());
+
+    final Throwable e1 =
+        assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(262144, 0, 0));
+    assertEquals("Can't support sequence[262144], the max value should be 262143", e1.getMessage());
+
+    final Throwable e2 =
+        assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(0, 16777216, 0));
+    assertEquals(
+        "Can't support partitionId[16777216], the max value should be 16777215", e2.getMessage());
+
+    final Throwable e3 =
+        assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(0, 0, 2097152));
+    assertEquals(
+        "Can't support taskAttemptId[2097152], the max value should be 2097151", e3.getMessage());
+
+    final Throwable e4 =
+        assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(-1, 0, 0));
+    assertEquals("Can't support sequence[-1], the max value should be 262143", e4.getMessage());
+
+    final Throwable e5 =
+        assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(0, -1, 0));
+    assertEquals(
+        "Can't support partitionId[-1], the max value should be 16777215", e5.getMessage());
+
+    final Throwable e6 =
+        assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(0, 0, -1));
+    assertEquals(
+        "Can't support taskAttemptId[-1], the max value should be 2097151", e6.getMessage());
+  }
+}
diff --git a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index 0e70486..d8457ae 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -229,10 +229,16 @@
   public void testShuffleBitmapToPartitionBitmap() {
     Roaring64NavigableMap partition1Bitmap =
         Roaring64NavigableMap.bitmapOf(
-            getBlockId(0, 0, 0), getBlockId(0, 0, 1), getBlockId(0, 1, 0), getBlockId(0, 1, 1));
+            BlockId.getBlockId(0, 0, 0),
+            BlockId.getBlockId(1, 0, 0),
+            BlockId.getBlockId(0, 0, 1),
+            BlockId.getBlockId(1, 0, 1));
     Roaring64NavigableMap partition2Bitmap =
         Roaring64NavigableMap.bitmapOf(
-            getBlockId(1, 0, 0), getBlockId(1, 0, 1), getBlockId(1, 1, 0), getBlockId(1, 1, 1));
+            BlockId.getBlockId(0, 1, 0),
+            BlockId.getBlockId(1, 1, 0),
+            BlockId.getBlockId(0, 1, 1),
+            BlockId.getBlockId(1, 1, 1));
     Roaring64NavigableMap shuffleBitmap = Roaring64NavigableMap.bitmapOf();
     shuffleBitmap.or(partition1Bitmap);
     shuffleBitmap.or(partition2Bitmap);
@@ -291,13 +297,6 @@
             });
   }
 
-  // Copy from ClientUtils
-  private Long getBlockId(long partitionId, long taskAttemptId, long atomicInt) {
-    return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
-        + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
-        + taskAttemptId;
-  }
-
   interface RssUtilTestDummy {
     String get();
   }
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
index dc43ca1..c21808c 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
@@ -22,7 +22,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -40,12 +40,12 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.segment.FixedSizeSegmentSplitter;
 import org.apache.uniffle.common.segment.SegmentSplitter;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.Constants;
 
 public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
 
-  private static AtomicLong ATOMIC_LONG = new AtomicLong(0L);
+  private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
   public static List<ShuffleServerInfo> mockSSI =
       Lists.newArrayList(new ShuffleServerInfo("id", "host", 0));
 
@@ -62,11 +62,9 @@
     for (int i = 0; i < blockNum; i++) {
       byte[] buf = new byte[length];
       new Random().nextBytes(buf);
-      long seqno = ATOMIC_LONG.getAndIncrement();
+      int seqno = ATOMIC_INT.getAndIncrement();
 
-      long blockId =
-          (seqno << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
-              + taskAttemptId;
+      long blockId = BlockId.getBlockId(seqno, 0, taskAttemptId);
       blockIdBitmap.addLong(blockId);
       dataMap.put(blockId, buf);
       shuffleBlockInfoList.add(
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index a1d662c..64d7739 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -56,7 +56,6 @@
 import org.apache.uniffle.client.response.RssRegisterShuffleResponse;
 import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
 import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
-import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
@@ -65,6 +64,7 @@
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.metrics.TestUtils;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.proto.RssProtos;
@@ -338,17 +338,17 @@
     assertEquals(expectedP3, blockIdBitmap);
 
     partitionToBlockIds = Maps.newHashMap();
-    blockIds1 = getBlockIdList((int) Constants.MAX_PARTITION_ID, 3);
+    blockIds1 = getBlockIdList(Constants.MAX_PARTITION_ID, 3);
     blockIds2 = getBlockIdList(2, 2);
     blockIds3 = getBlockIdList(3, 1);
-    partitionToBlockIds.put((int) Constants.MAX_PARTITION_ID, blockIds1);
+    partitionToBlockIds.put(Constants.MAX_PARTITION_ID, blockIds1);
     partitionToBlockIds.put(2, blockIds2);
     partitionToBlockIds.put(3, blockIds3);
     // bimapNum = 2
     request = new RssReportShuffleResultRequest("shuffleResultTest", 4, 1L, partitionToBlockIds, 2);
     shuffleServerClient.reportShuffleResult(request);
 
-    req = new RssGetShuffleResultRequest("shuffleResultTest", 4, (int) Constants.MAX_PARTITION_ID);
+    req = new RssGetShuffleResultRequest("shuffleResultTest", 4, Constants.MAX_PARTITION_ID);
     result = shuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
     expectedP1 = Roaring64NavigableMap.bitmapOf();
@@ -610,7 +610,7 @@
           for (int i = 0; i < 100; i++) {
             Map<Integer, List<Long>> ptbs = Maps.newHashMap();
             List<Long> blockIds = Lists.newArrayList();
-            Long blockId = ClientUtils.getBlockId(1, 0, i);
+            Long blockId = BlockId.getBlockId(i, 1, 0);
             expectedBlockIds.add(blockId);
             blockIds.add(blockId);
             ptbs.put(1, blockIds);
@@ -624,7 +624,7 @@
           for (int i = 100; i < 200; i++) {
             Map<Integer, List<Long>> ptbs = Maps.newHashMap();
             List<Long> blockIds = Lists.newArrayList();
-            Long blockId = ClientUtils.getBlockId(1, 1, i);
+            Long blockId = BlockId.getBlockId(i, 1, 1);
             expectedBlockIds.add(blockId);
             blockIds.add(blockId);
             ptbs.put(1, blockIds);
@@ -638,7 +638,7 @@
           for (int i = 200; i < 300; i++) {
             Map<Integer, List<Long>> ptbs = Maps.newHashMap();
             List<Long> blockIds = Lists.newArrayList();
-            Long blockId = ClientUtils.getBlockId(1, 2, i);
+            Long blockId = BlockId.getBlockId(i, 1, 2);
             expectedBlockIds.add(blockId);
             blockIds.add(blockId);
             ptbs.put(1, blockIds);
@@ -984,7 +984,7 @@
   private List<Long> getBlockIdList(int partitionId, int blockNum) {
     List<Long> blockIds = Lists.newArrayList();
     for (int i = 0; i < blockNum; i++) {
-      blockIds.add(ClientUtils.getBlockId(partitionId, 0, atomicInteger.getAndIncrement()));
+      blockIds.add(BlockId.getBlockId(atomicInteger.getAndIncrement(), partitionId, 0));
     }
     return blockIds;
   }
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 1442b21..d87bada 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -37,7 +37,6 @@
 import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
 import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
 import org.apache.uniffle.client.response.SendShuffleDataResult;
-import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
@@ -45,6 +44,7 @@
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -203,7 +203,7 @@
     Set<Long> blockIds = Sets.newHashSet();
     int partitionIdx = 1;
     for (int i = 0; i < 5; i++) {
-      blockIds.add(ClientUtils.getBlockId(partitionIdx, 0, i));
+      blockIds.add(BlockId.getBlockId(i, partitionIdx, 0));
     }
     partitionToBlocks.put(partitionIdx, blockIds);
     serverToPartitionToBlockIds.put(shuffleServerInfo1, partitionToBlocks);
@@ -251,7 +251,7 @@
     Map<Integer, Set<Long>> partitionToBlocks1 = Maps.newHashMap();
     Set<Long> blockIds = Sets.newHashSet();
     for (int i = 0; i < 5; i++) {
-      blockIds.add(ClientUtils.getBlockId(1, 0, i));
+      blockIds.add(BlockId.getBlockId(i, 1, 0));
     }
     partitionToBlocks1.put(1, blockIds);
     Map<ShuffleServerInfo, Map<Integer, Set<Long>>> serverToPartitionToBlockIds = Maps.newHashMap();
@@ -260,7 +260,7 @@
     Map<Integer, Set<Long>> partitionToBlocks2 = Maps.newHashMap();
     blockIds = Sets.newHashSet();
     for (int i = 0; i < 7; i++) {
-      blockIds.add(ClientUtils.getBlockId(2, 0, i));
+      blockIds.add(BlockId.getBlockId(i, 2, 0));
     }
     partitionToBlocks2.put(2, blockIds);
     serverToPartitionToBlockIds.put(shuffleServerInfo2, partitionToBlocks2);
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index 2eb2ffa..65b699e 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -42,7 +42,7 @@
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServerConf;
@@ -105,7 +105,7 @@
     Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     createTestData(testAppId, expectedData, blockIdBitmap, taskIdBitmap);
-    blockIdBitmap.addLong((1 << Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+    blockIdBitmap.addLong(BlockId.getBlockId(0, 1, 0));
     ShuffleReadClientImpl readClient;
     readClient =
         baseReadBuilder()
@@ -252,7 +252,9 @@
     Roaring64NavigableMap wrongBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
     LongIterator iter = blockIdBitmap.getLongIterator();
     while (iter.hasNext()) {
-      wrongBlockIdBitmap.addLong(iter.next() + (1 << Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+      BlockId blockId = BlockId.fromLong(iter.next());
+      wrongBlockIdBitmap.addLong(
+          BlockId.getBlockId(blockId.sequenceNo, blockId.partitionId + 1, blockId.taskAttemptId));
     }
 
     ShuffleReadClientImpl readClient =
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 7977b80..c40003c 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -62,6 +62,7 @@
 import org.apache.uniffle.common.exception.NoRegisterException;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RssUtils;
@@ -573,11 +574,9 @@
       Set<Integer> requestPartitions,
       Roaring64NavigableMap bitmap,
       Roaring64NavigableMap resultBitmap) {
-    final long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
     bitmap.forEach(
         blockId -> {
-          int partitionId =
-              Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
+          int partitionId = BlockId.getPartitionId(blockId);
           if (requestPartitions.contains(partitionId)) {
             resultBitmap.addLong(blockId);
           }
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index fb1de11..426ef4b 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -54,6 +54,7 @@
 import org.apache.uniffle.common.exception.NoRegisterException;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.RssUtils;
@@ -714,7 +715,7 @@
     for (int taskId = 1; taskId < 10; taskId++) {
       for (int partitionId = 1; partitionId < 10; partitionId++) {
         for (int i = 0; i < 2; i++) {
-          long blockId = getBlockId(partitionId, taskId, i);
+          long blockId = BlockId.getBlockId(i, partitionId, taskId);
           bitmapBlockIds.addLong(blockId);
           if (partitionId == expectedPartitionId) {
             expectedBlockIds.addLong(blockId);
@@ -727,19 +728,19 @@
             Sets.newHashSet(expectedPartitionId), bitmapBlockIds, Roaring64NavigableMap.bitmapOf());
     assertEquals(expectedBlockIds, resultBlockIds);
 
-    bitmapBlockIds.addLong(getBlockId(0, 0, 0));
+    bitmapBlockIds.addLong(BlockId.getBlockId(0, 0, 0));
     resultBlockIds =
         shuffleTaskManager.getBlockIdsByPartitionId(
             Sets.newHashSet(0), bitmapBlockIds, Roaring64NavigableMap.bitmapOf());
     assertEquals(Roaring64NavigableMap.bitmapOf(0L), resultBlockIds);
 
     long expectedBlockId =
-        getBlockId(
-            Constants.MAX_PARTITION_ID, Constants.MAX_TASK_ATTEMPT_ID, Constants.MAX_SEQUENCE_NO);
+        BlockId.getBlockId(
+            Constants.MAX_SEQUENCE_NO, Constants.MAX_PARTITION_ID, Constants.MAX_TASK_ATTEMPT_ID);
     bitmapBlockIds.addLong(expectedBlockId);
     resultBlockIds =
         shuffleTaskManager.getBlockIdsByPartitionId(
-            Sets.newHashSet(Math.toIntExact(Constants.MAX_PARTITION_ID)),
+            Sets.newHashSet(Constants.MAX_PARTITION_ID),
             bitmapBlockIds,
             Roaring64NavigableMap.bitmapOf());
     assertEquals(Roaring64NavigableMap.bitmapOf(expectedBlockId), resultBlockIds);
@@ -757,7 +758,7 @@
     for (int taskId = 1; taskId < 10; taskId++) {
       for (int partitionId = 1; partitionId < 10; partitionId++) {
         for (int i = 0; i < 2; i++) {
-          long blockId = getBlockId(partitionId, taskId, i);
+          long blockId = BlockId.getBlockId(i, partitionId, taskId);
           bitmapBlockIds.addLong(blockId);
           if (partitionId >= startPartition && partitionId <= endPartition) {
             expectedBlockIds.addLong(blockId);
@@ -830,7 +831,7 @@
       long[] blockIds = new long[taskNum * blocksPerTask];
       for (int taskId = 0; taskId < taskNum; taskId++) {
         for (int i = 0; i < blocksPerTask; i++) {
-          long blockId = getBlockId(partitionId, taskId, i);
+          long blockId = BlockId.getBlockId(i, partitionId, taskId);
           blockIds[taskId * blocksPerTask + i] = blockId;
         }
       }
@@ -993,13 +994,6 @@
     return appIdsOnDisk;
   }
 
-  // copy from ClientUtils
-  private Long getBlockId(long partitionId, long taskAttemptId, long atomicInt) {
-    return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
-        + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
-        + taskAttemptId;
-  }
-
   private void waitForFlush(
       ShuffleFlushManager shuffleFlushManager, String appId, int shuffleId, int expectedBlockNum)
       throws Exception {
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java b/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java
index d4e25c4..38f3abb 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java
@@ -19,6 +19,8 @@
 
 import java.util.Objects;
 
+import org.apache.uniffle.common.util.BlockId;
+
 public class FileBasedShuffleSegment extends ShuffleSegment
     implements Comparable<FileBasedShuffleSegment> {
 
@@ -119,9 +121,9 @@
         + uncompressLength
         + "], crc["
         + crc
-        + "], blockId["
-        + blockId
-        + "], taskAttemptId["
+        + "], "
+        + BlockId.toString(blockId)
+        + ", taskAttemptId["
         + taskAttemptId
         + "]}";
   }
diff --git a/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java b/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
index b79d18f..7b30493 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
@@ -21,7 +21,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
@@ -30,9 +30,9 @@
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
 import org.apache.uniffle.storage.handler.impl.HadoopFileReader;
 import org.apache.uniffle.storage.handler.impl.HadoopFileWriter;
@@ -43,7 +43,7 @@
 
 public class HadoopShuffleHandlerTestBase {
 
-  private static final AtomicLong ATOMIC_LONG = new AtomicLong(0);
+  private static final AtomicInteger ATOMIC_INT = new AtomicInteger(0);
 
   public static void writeTestData(
       HadoopShuffleWriteHandler writeHandler,
@@ -56,10 +56,7 @@
     for (int i = 0; i < num; i++) {
       byte[] buf = new byte[length];
       new Random().nextBytes(buf);
-      long blockId =
-          (ATOMIC_LONG.getAndIncrement()
-                  << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
-              + taskAttemptId;
+      long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0, taskAttemptId);
       blocks.add(
           new ShufflePartitionedBlock(
               length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
@@ -84,10 +81,7 @@
     for (int i = 0; i < num; i++) {
       byte[] buf = new byte[length];
       new Random().nextBytes(buf);
-      long blockId =
-          (ATOMIC_LONG.getAndIncrement()
-                  << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
-              + taskAttemptId;
+      long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0, taskAttemptId);
       blocks.add(
           new ShufflePartitionedBlock(
               length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
index ed38338..8b307a3 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
@@ -36,8 +36,8 @@
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.HadoopShuffleHandlerTestBase;
 import org.apache.uniffle.storage.HadoopTestBase;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
@@ -118,10 +118,7 @@
     List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
     byte[] buf = new byte[blockSize];
     new Random().nextBytes(buf);
-    long blockId =
-        (expectTotalBlockNum
-                << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
-            + taskAttemptId;
+    long blockId = BlockId.getBlockId(expectTotalBlockNum, 0, taskAttemptId);
     blocks.add(
         new ShufflePartitionedBlock(
             blockSize, blockSize, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
index e02387e..19a2905 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
@@ -22,7 +22,7 @@
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -33,6 +33,7 @@
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.segment.FixedSizeSegmentSplitter;
+import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
@@ -43,14 +44,14 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class LocalFileHandlerTestBase {
-  private static AtomicLong ATOMIC_LONG = new AtomicLong(0L);
+  private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
 
   public static List<ShufflePartitionedBlock> generateBlocks(int num, int length) {
     List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
     for (int i = 0; i < num; i++) {
       byte[] buf = new byte[length];
       new Random().nextBytes(buf);
-      long blockId = ATOMIC_LONG.incrementAndGet();
+      long blockId = BlockId.getBlockId(ATOMIC_INT.incrementAndGet(), 0, 100);
       blocks.add(
           new ShufflePartitionedBlock(
               length, length, ChecksumUtils.getCrc32(buf), blockId, 100, buf));