Hide implementation of block id set behind BlockIdSet interface
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
index f761c6e..0795a55 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
@@ -32,12 +32,12 @@
 import org.apache.spark.serializer.SerializationStream;
 import org.apache.spark.serializer.Serializer;
 import org.apache.spark.serializer.SerializerInstance;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.storage.HadoopTestBase;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -67,7 +67,7 @@
       int blockNum,
       int recordNum,
       Map<String, String> expectedData,
-      Roaring64NavigableMap blockIdBitmap,
+      BlockIdSet blockIdBitmap,
       String keyPrefix,
       Serializer serializer,
       int partitionID)
@@ -91,7 +91,7 @@
       int recordNum,
       BlockIdLayout layout,
       Map<String, String> expectedData,
-      Roaring64NavigableMap blockIdBitmap,
+      BlockIdSet blockIdBitmap,
       String keyPrefix,
       Serializer serializer,
       int partitionID)
@@ -114,7 +114,7 @@
       int blockNum,
       int recordNum,
       Map<String, String> expectedData,
-      Roaring64NavigableMap blockIdBitmap,
+      BlockIdSet blockIdBitmap,
       String keyPrefix,
       Serializer serializer,
       int partitionID,
@@ -139,7 +139,7 @@
       int recordNum,
       BlockIdLayout layout,
       Map<String, String> expectedData,
-      Roaring64NavigableMap blockIdBitmap,
+      BlockIdSet blockIdBitmap,
       String keyPrefix,
       Serializer serializer,
       int partitionID,
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index 3f6993c..490bf62 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -48,6 +48,7 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
 import org.apache.uniffle.storage.util.StorageType;
@@ -82,7 +83,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
 
     Map<String, String> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(
         writeHandler, 2, 5, layout, expectedData, blockIdBitmap, "key", KRYO_SERIALIZER, 0);
@@ -111,7 +112,7 @@
 
   private RssShuffleDataIterator getDataIterator(
       String basePath,
-      Roaring64NavigableMap blockIdBitmap,
+      BlockIdSet blockIdBitmap,
       Roaring64NavigableMap taskIdBitmap,
       List<ShuffleServerInfo> serverInfos) {
     return getDataIterator(basePath, blockIdBitmap, taskIdBitmap, serverInfos, true);
@@ -119,7 +120,7 @@
 
   private RssShuffleDataIterator getDataIterator(
       String basePath,
-      Roaring64NavigableMap blockIdBitmap,
+      BlockIdSet blockIdBitmap,
       Roaring64NavigableMap taskIdBitmap,
       List<ShuffleServerInfo> serverInfos,
       boolean compress) {
@@ -164,7 +165,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);
 
     Map<String, String> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler1, 2, 5, expectedData, blockIdBitmap, "key1", KRYO_SERIALIZER, 0);
     writeTestData(writeHandler2, 2, 5, expectedData, blockIdBitmap, "key2", KRYO_SERIALIZER, 0);
@@ -205,7 +206,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
 
     Map<String, String> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 2, 5, expectedData, blockIdBitmap, "key", KRYO_SERIALIZER, 0);
 
@@ -238,7 +239,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
 
     Map<String, String> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 2, 5, expectedData, blockIdBitmap, "key", KRYO_SERIALIZER, 0);
 
@@ -264,7 +265,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
 
     Map<String, String> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 2, 5, expectedData, blockIdBitmap, "key", KRYO_SERIALIZER, 0);
 
@@ -311,7 +312,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);
 
     Map<String, String> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(
         writeHandler1, 2, 5, expectedData, blockIdBitmap, "key1", KRYO_SERIALIZER, 0, compress);
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 6f5b255..fadf085 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -75,6 +75,7 @@
 import org.apache.uniffle.common.exception.RssFetchFailedException;
 import org.apache.uniffle.common.rpc.GrpcServer;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.common.util.RssUtils;
@@ -573,7 +574,7 @@
       }
       Map<Integer, List<ShuffleServerInfo>> partitionToServers =
           shuffleHandleInfo.getAllPartitionServersForReader();
-      Roaring64NavigableMap blockIdBitmap =
+      BlockIdSet blockIdBitmap =
           getShuffleResult(
               clientType,
               Sets.newHashSet(partitionToServers.get(startPartition)),
@@ -790,7 +791,7 @@
     return shuffleIdToNumMapTasks.getOrDefault(shuffleId, 0);
   }
 
-  private Roaring64NavigableMap getShuffleResult(
+  private BlockIdSet getShuffleResult(
       String clientType,
       Set<ShuffleServerInfo> shuffleServerInfoSet,
       String appId,
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 8f5118e..a40fa4d 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -53,6 +53,7 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockIdSet;
 
 public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
 
@@ -70,7 +71,7 @@
   private String basePath;
   private int partitionNumPerRange;
   private int partitionNum;
-  private Roaring64NavigableMap blockIdBitmap;
+  private BlockIdSet blockIdBitmap;
   private Roaring64NavigableMap taskIdBitmap;
   private List<ShuffleServerInfo> shuffleServerInfoList;
   private Configuration hadoopConf;
@@ -85,7 +86,7 @@
       Configuration hadoopConf,
       int partitionNumPerRange,
       int partitionNum,
-      Roaring64NavigableMap blockIdBitmap,
+      BlockIdSet blockIdBitmap,
       Roaring64NavigableMap taskIdBitmap,
       RssConf rssConf,
       Map<Integer, List<ShuffleServerInfo>> partitionToServers) {
diff --git a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index f09223b..54ee1a1 100644
--- a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++ b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -38,6 +38,7 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
 import org.apache.uniffle.storage.util.StorageType;
 
@@ -58,7 +59,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi.getId(), conf);
 
     Map<String, String> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 2, 5, expectedData, blockIdBitmap, "key", KRYO_SERIALIZER, 0);
 
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index f70e38a..46b5ffb 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -89,6 +89,7 @@
 import org.apache.uniffle.common.rpc.GrpcServer;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.common.util.RssUtils;
@@ -700,7 +701,7 @@
     Map<ShuffleServerInfo, Set<Integer>> serverToPartitions =
         getPartitionDataServers(shuffleHandleInfo, startPartition, endPartition);
     long start = System.currentTimeMillis();
-    Roaring64NavigableMap blockIdBitmap =
+    BlockIdSet blockIdBitmap =
         getShuffleResultForMultiPart(
             clientType,
             serverToPartitions,
@@ -1108,7 +1109,7 @@
     return !failedTaskIds.contains(taskId);
   }
 
-  private Roaring64NavigableMap getShuffleResultForMultiPart(
+  private BlockIdSet getShuffleResultForMultiPart(
       String clientType,
       Map<ShuffleServerInfo, Set<Integer>> serverToPartitions,
       String appId,
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 9b17634..446a359 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -55,6 +55,7 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockIdSet;
 
 import static org.apache.uniffle.common.util.Constants.DRIVER_HOST;
 
@@ -73,7 +74,7 @@
   private String taskId;
   private String basePath;
   private int partitionNum;
-  private Map<Integer, Roaring64NavigableMap> partitionToExpectBlocks;
+  private Map<Integer, BlockIdSet> partitionToExpectBlocks;
   private Roaring64NavigableMap taskIdBitmap;
   private Configuration hadoopConf;
   private int mapStartIndex;
@@ -92,7 +93,7 @@
       String basePath,
       Configuration hadoopConf,
       int partitionNum,
-      Map<Integer, Roaring64NavigableMap> partitionToExpectBlocks,
+      Map<Integer, BlockIdSet> partitionToExpectBlocks,
       Roaring64NavigableMap taskIdBitmap,
       ShuffleReadMetrics readMetrics,
       RssConf rssConf,
diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index aaff4cb..ec4b7c7 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -40,6 +40,7 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
 import org.apache.uniffle.storage.util.StorageType;
 
@@ -61,10 +62,10 @@
     final HadoopShuffleWriteHandler writeHandler1 =
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi.getId(), conf);
 
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     Map<String, String> expectedData = Maps.newHashMap();
-    final Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
+    final BlockIdSet blockIdBitmap1 = BlockIdSet.empty();
     writeTestData(writeHandler, 2, 5, expectedData, blockIdBitmap, "key", KRYO_SERIALIZER, 0);
 
     RssShuffleHandle<String, String, String> handleMock = mock(RssShuffleHandle.class);
@@ -87,7 +88,7 @@
     when(dependencyMock.keyOrdering()).thenReturn(Option.empty());
     when(dependencyMock.mapSideCombine()).thenReturn(false);
 
-    Map<Integer, Roaring64NavigableMap> partitionToExpectBlocks = Maps.newHashMap();
+    Map<Integer, BlockIdSet> partitionToExpectBlocks = Maps.newHashMap();
     partitionToExpectBlocks.put(0, blockIdBitmap);
     RssConf rssConf = new RssConf();
     rssConf.set(RssClientConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
diff --git a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index 7d8f533..c90950d 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -22,8 +22,6 @@
 import java.util.Set;
 import java.util.function.Supplier;
 
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
-
 import org.apache.uniffle.client.PartitionDataReplicaRequirementTracking;
 import org.apache.uniffle.client.response.SendShuffleDataResult;
 import org.apache.uniffle.common.PartitionRange;
@@ -32,6 +30,7 @@
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.BlockIdSet;
 
 public interface ShuffleWriteClient {
 
@@ -92,14 +91,14 @@
       int assignmentShuffleServerNumber,
       int estimateTaskConcurrency);
 
-  Roaring64NavigableMap getShuffleResult(
+  BlockIdSet getShuffleResult(
       String clientType,
       Set<ShuffleServerInfo> shuffleServerInfoSet,
       String appId,
       int shuffleId,
       int partitionId);
 
-  Roaring64NavigableMap getShuffleResultForMultiPart(
+  BlockIdSet getShuffleResultForMultiPart(
       String clientType,
       Map<ShuffleServerInfo, Set<Integer>> serverToPartitions,
       String appId,
diff --git a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index 0eed01e..e4461a3 100644
--- a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++ b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -30,6 +30,7 @@
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.IdHelper;
 
 public class ShuffleClientFactory {
@@ -199,7 +200,7 @@
     private String basePath;
     private int partitionNumPerRange;
     private int partitionNum;
-    private Roaring64NavigableMap blockIdBitmap;
+    private BlockIdSet blockIdBitmap;
     private Roaring64NavigableMap taskIdBitmap;
     private List<ShuffleServerInfo> shuffleServerInfoList;
     private Configuration hadoopConf;
@@ -245,7 +246,7 @@
       return this;
     }
 
-    public ReadClientBuilder blockIdBitmap(Roaring64NavigableMap blockIdBitmap) {
+    public ReadClientBuilder blockIdBitmap(BlockIdSet blockIdBitmap) {
       this.blockIdBitmap = blockIdBitmap;
       return this;
     }
@@ -348,7 +349,7 @@
       return basePath;
     }
 
-    public Roaring64NavigableMap getBlockIdBitmap() {
+    public BlockIdSet getBlockIdBitmap() {
       return blockIdBitmap;
     }
 
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index e1aa0f9..64246c4 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -43,6 +43,7 @@
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssFetchFailedException;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.IdHelper;
 import org.apache.uniffle.common.util.RssUtils;
@@ -58,10 +59,10 @@
   private int partitionId;
   private ByteBuffer readBuffer;
   private ShuffleDataResult sdr;
-  private Roaring64NavigableMap blockIdBitmap;
+  private BlockIdSet blockIdBitmap;
   private Roaring64NavigableMap taskIdBitmap;
-  private Roaring64NavigableMap pendingBlockIds;
-  private Roaring64NavigableMap processedBlockIds = Roaring64NavigableMap.bitmapOf();
+  private BlockIdSet pendingBlockIds;
+  private BlockIdSet processedBlockIds = BlockIdSet.empty();
   private Queue<BufferSegment> bufferSegmentQueue = Queues.newLinkedBlockingQueue();
   private AtomicLong readDataTime = new AtomicLong(0);
   private AtomicLong copyTime = new AtomicLong(0);
@@ -188,12 +189,10 @@
           }
         });
 
-    for (long rid : removeBlockIds) {
-      blockIdBitmap.removeLong(rid);
-    }
+    blockIdBitmap.removeAll(removeBlockIds.stream());
 
     // copy blockIdBitmap to track all pending blocks
-    pendingBlockIds = RssUtils.cloneBitMap(blockIdBitmap);
+    pendingBlockIds = blockIdBitmap.copy();
 
     clientReadHandler = ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
   }
@@ -266,16 +265,16 @@
           }
 
           // mark block as processed
-          processedBlockIds.addLong(bs.getBlockId());
-          pendingBlockIds.removeLong(bs.getBlockId());
+          processedBlockIds.add(bs.getBlockId());
+          pendingBlockIds.remove(bs.getBlockId());
           // only update the statistics of necessary blocks
           clientReadHandler.updateConsumedBlockInfo(bs, false);
           break;
         }
         clientReadHandler.updateConsumedBlockInfo(bs, true);
         // mark block as processed
-        processedBlockIds.addLong(bs.getBlockId());
-        pendingBlockIds.removeLong(bs.getBlockId());
+        processedBlockIds.add(bs.getBlockId());
+        pendingBlockIds.remove(bs.getBlockId());
       }
 
       if (bs != null) {
@@ -289,7 +288,7 @@
   }
 
   @VisibleForTesting
-  protected Roaring64NavigableMap getProcessedBlockIds() {
+  protected BlockIdSet getProcessedBlockIds() {
     return processedBlockIds;
   }
 
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index b9e523b..f721594 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -39,7 +39,6 @@
 import com.google.common.collect.Sets;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,6 +91,7 @@
 import org.apache.uniffle.common.exception.RssFetchFailedException;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 
@@ -768,7 +768,7 @@
   }
 
   @Override
-  public Roaring64NavigableMap getShuffleResult(
+  public BlockIdSet getShuffleResult(
       String clientType,
       Set<ShuffleServerInfo> shuffleServerInfoSet,
       String appId,
@@ -777,7 +777,7 @@
     RssGetShuffleResultRequest request =
         new RssGetShuffleResultRequest(appId, shuffleId, partitionId, blockIdLayout);
     boolean isSuccessful = false;
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     int successCnt = 0;
     for (ShuffleServerInfo ssi : shuffleServerInfoSet) {
       try {
@@ -785,8 +785,8 @@
             getShuffleServerClient(ssi).getShuffleResult(request);
         if (response.getStatusCode() == StatusCode.SUCCESS) {
           // merge into blockIds from multiple servers.
-          Roaring64NavigableMap blockIdBitmapOfServer = response.getBlockIdBitmap();
-          blockIdBitmap.or(blockIdBitmapOfServer);
+          BlockIdSet blockIdBitmapOfServer = response.getBlockIdBitmap();
+          blockIdBitmap.addAll(blockIdBitmapOfServer);
           successCnt++;
           if (successCnt >= replicaRead) {
             isSuccessful = true;
@@ -812,14 +812,14 @@
   }
 
   @Override
-  public Roaring64NavigableMap getShuffleResultForMultiPart(
+  public BlockIdSet getShuffleResultForMultiPart(
       String clientType,
       Map<ShuffleServerInfo, Set<Integer>> serverToPartitions,
       String appId,
       int shuffleId,
       Set<Integer> failedPartitions,
       PartitionDataReplicaRequirementTracking replicaRequirementTracking) {
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Set<Integer> allRequestedPartitionIds = new HashSet<>();
     for (Map.Entry<ShuffleServerInfo, Set<Integer>> entry : serverToPartitions.entrySet()) {
       ShuffleServerInfo shuffleServerInfo = entry.getKey();
@@ -838,8 +838,8 @@
             getShuffleServerClient(shuffleServerInfo).getShuffleResultForMultiPart(request);
         if (response.getStatusCode() == StatusCode.SUCCESS) {
           // merge into blockIds from multiple servers.
-          Roaring64NavigableMap blockIdBitmapOfServer = response.getBlockIdBitmap();
-          blockIdBitmap.or(blockIdBitmapOfServer);
+          BlockIdSet blockIdBitmapOfServer = response.getBlockIdBitmap();
+          blockIdBitmap.addAll(blockIdBitmapOfServer);
           for (Integer partitionId : requestPartitions) {
             replicaRequirementTracking.markPartitionOfServerSuccessful(
                 partitionId, shuffleServerInfo);
diff --git a/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java b/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
index dd9ef62..fe3da70 100644
--- a/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
@@ -34,6 +34,7 @@
 import org.apache.uniffle.client.util.ClientUtils;
 import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.RssUtils;
 
 import static org.apache.uniffle.client.util.ClientUtils.waitUntilDoneOrFail;
@@ -49,14 +50,14 @@
   public void testGenerateTaskIdBitMap() {
     int partitionId = 1;
     BlockIdLayout layout = BlockIdLayout.DEFAULT;
-    Roaring64NavigableMap blockIdMap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdMap = BlockIdSet.empty();
     int taskSize = 10;
     long[] except = new long[taskSize];
     for (int i = 0; i < taskSize; i++) {
       except[i] = i;
       for (int j = 0; j < 100; j++) {
         long blockId = layout.getBlockId(j, partitionId, i);
-        blockIdMap.addLong(blockId);
+        blockIdMap.add(blockId);
       }
     }
     Roaring64NavigableMap taskIdBitMap =
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index 5d0d4b4..eb4f94f 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -18,6 +18,7 @@
 package org.apache.uniffle.client.impl;
 
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -30,7 +31,6 @@
 import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
-import org.roaringbitmap.longlong.LongIterator;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.TestUtils;
@@ -43,6 +43,7 @@
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.storage.HadoopTestBase;
 import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
@@ -84,7 +85,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 2, 30, 1, 0, expectedData, blockIdBitmap);
     ShuffleReadClientImpl readClient =
@@ -99,7 +100,7 @@
     readClient.close();
 
     BlockIdLayout layout = BlockIdLayout.DEFAULT;
-    blockIdBitmap.addLong(layout.getBlockId(0, 0, layout.maxTaskAttemptId - 1));
+    blockIdBitmap.add(layout.getBlockId(0, 0, layout.maxTaskAttemptId - 1));
     taskIdBitmap.addLong(layout.maxTaskAttemptId - 1);
     readClient =
         baseReadBuilder()
@@ -129,7 +130,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler1, 2, 30, 0, 0, expectedData, blockIdBitmap);
     writeTestData(writeHandler2, 2, 30, 0, 0, expectedData, blockIdBitmap);
@@ -157,7 +158,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    final BlockIdSet blockIdBitmap = BlockIdSet.empty();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler1, 2, 30, 0, 0, expectedData, blockIdBitmap);
     writeTestData(writeHandler2, 2, 30, 0, 0, expectedData, blockIdBitmap);
@@ -214,7 +215,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
 
@@ -254,7 +255,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
     ShuffleReadClientImpl readClient =
@@ -282,11 +283,11 @@
 
     Map<Long, byte[]> expectedData1 = Maps.newHashMap();
     Map<Long, byte[]> expectedData2 = Maps.newHashMap();
-    final Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
+    final BlockIdSet blockIdBitmap1 = BlockIdSet.empty();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 10, 30, 0, 0, expectedData1, blockIdBitmap1);
 
-    Roaring64NavigableMap blockIdBitmap2 = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap2 = BlockIdSet.empty();
     writeTestData(writeHandler, 10, 30, 0, 0, expectedData2, blockIdBitmap2);
 
     writeTestData(writeHandler, 10, 30, 0, 0, expectedData1, blockIdBitmap1);
@@ -323,7 +324,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
     ShuffleReadClientImpl readClient =
@@ -375,7 +376,7 @@
             .partitionId(0)
             .partitionNumPerRange(2)
             .basePath("basePath")
-            .blockIdBitmap(Roaring64NavigableMap.bitmapOf())
+            .blockIdBitmap(BlockIdSet.empty())
             .taskIdBitmap(Roaring64NavigableMap.bitmapOf())
             .build();
     assertNull(readClient.readShuffleBlockData());
@@ -390,14 +391,14 @@
 
     BlockIdLayout layout = BlockIdLayout.DEFAULT;
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 5, 30, 0, 0, expectedData, blockIdBitmap);
-    Roaring64NavigableMap wrongBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
-    LongIterator iter = blockIdBitmap.getLongIterator();
+    BlockIdSet wrongBlockIdBitmap = BlockIdSet.empty();
+    Iterator<Long> iter = blockIdBitmap.stream().iterator();
     while (iter.hasNext()) {
       BlockId blockId = layout.asBlockId(iter.next());
-      wrongBlockIdBitmap.addLong(
+      wrongBlockIdBitmap.add(
           layout.getBlockId(blockId.sequenceNo, blockId.partitionId + 1, blockId.taskAttemptId));
     }
 
@@ -425,7 +426,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 10, 30, 1, 0, expectedData, blockIdBitmap);
     // test with different indexReadLimit to validate result
@@ -497,7 +498,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    final BlockIdSet blockIdBitmap = BlockIdSet.empty();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 1);
     writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
     writeTestData(writeHandler, 5, 30, 1, 2, Maps.newHashMap(), blockIdBitmap);
@@ -539,12 +540,11 @@
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    final BlockIdSet blockIdBitmap = BlockIdSet.empty();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 3);
     writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap, layout);
     // test case: data generated by speculation task without report result
-    writeTestData(
-        writeHandler, 5, 30, 1, 1, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf(), layout);
+    writeTestData(writeHandler, 5, 30, 1, 1, Maps.newHashMap(), BlockIdSet.empty(), layout);
     // test case: data generated by speculation task with report result
     writeTestData(writeHandler, 5, 30, 1, 2, Maps.newHashMap(), blockIdBitmap, layout);
     writeTestData(writeHandler, 5, 30, 1, 3, expectedData, blockIdBitmap, layout);
@@ -597,10 +597,10 @@
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    final BlockIdSet blockIdBitmap = BlockIdSet.empty();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 2);
     writeDuplicatedData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
-    writeTestData(writeHandler, 5, 30, 1, 1, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
+    writeTestData(writeHandler, 5, 30, 1, 1, Maps.newHashMap(), BlockIdSet.empty());
     writeTestData(writeHandler, 5, 30, 1, 2, expectedData, blockIdBitmap);
 
     // unexpected taskAttemptId should be filtered
@@ -624,14 +624,14 @@
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    final BlockIdSet blockIdBitmap = BlockIdSet.empty();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
     writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
-    writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
-    writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
+    writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), BlockIdSet.empty());
+    writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), BlockIdSet.empty());
     writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
-    writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
+    writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), BlockIdSet.empty());
     // unexpected taskAttemptId should be filtered
     ShuffleReadClientImpl readClient =
         baseReadBuilder()
@@ -656,8 +656,8 @@
 
     Map<Long, byte[]> expectedData0 = Maps.newHashMap();
     Map<Long, byte[]> expectedData1 = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap0 = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap0 = BlockIdSet.empty();
+    BlockIdSet blockIdBitmap1 = BlockIdSet.empty();
     writeTestData(writeHandler0, 2, 30, 0, 0, expectedData0, blockIdBitmap0);
     writeTestData(writeHandler1, 2, 30, 1, 1, expectedData1, blockIdBitmap1);
 
@@ -695,7 +695,7 @@
       int partitionId,
       long taskAttemptId,
       Map<Long, byte[]> expectedData,
-      Roaring64NavigableMap blockIdBitmap,
+      BlockIdSet blockIdBitmap,
       BlockIdLayout layout)
       throws Exception {
     List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
@@ -707,7 +707,7 @@
           new ShufflePartitionedBlock(
               length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
       expectedData.put(blockId, buf);
-      blockIdBitmap.addLong(blockId);
+      blockIdBitmap.add(blockId);
     }
     writeHandler.write(blocks);
   }
@@ -719,7 +719,7 @@
       int partitionId,
       long taskAttemptId,
       Map<Long, byte[]> expectedData,
-      Roaring64NavigableMap blockIdBitmap)
+      BlockIdSet blockIdBitmap)
       throws Exception {
     writeTestData(
         writeHandler,
@@ -739,7 +739,7 @@
       int partitionId,
       long taskAttemptId,
       Map<Long, byte[]> expectedData,
-      Roaring64NavigableMap blockIdBitmap)
+      BlockIdSet blockIdBitmap)
       throws Exception {
     BlockIdLayout layout = BlockIdLayout.DEFAULT;
     List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
@@ -753,7 +753,7 @@
       blocks.add(spb);
       blocks.add(spb);
       expectedData.put(blockId, buf);
-      blockIdBitmap.addLong(blockId);
+      blockIdBitmap.add(blockId);
     }
     writeHandler.write(blocks);
   }
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
index d26e969..74a8ca4 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
@@ -49,6 +49,7 @@
 import org.apache.uniffle.common.netty.IOMode;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.RssUtils;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -374,8 +375,7 @@
 
     Set<ShuffleServerInfo> shuffleServerInfoSet =
         Sets.newHashSet(new ShuffleServerInfo("id", "host", 0));
-    Roaring64NavigableMap result =
-        spyClient.getShuffleResult("GRPC", shuffleServerInfoSet, "appId", 1, 2);
+    BlockIdSet result = spyClient.getShuffleResult("GRPC", shuffleServerInfoSet, "appId", 1, 2);
 
     verify(mockShuffleServerClient)
         .getShuffleResult(argThat(request -> request.getBlockIdLayout().equals(layout)));
diff --git a/common/src/main/java/org/apache/uniffle/common/util/BlockIdSet.java b/common/src/main/java/org/apache/uniffle/common/util/BlockIdSet.java
new file mode 100644
index 0000000..2847fff
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/BlockIdSet.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.function.LongConsumer;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+
+/** Implementations are thread-safe. */
+public interface BlockIdSet {
+  BlockIdSet add(long blockId);
+
+  default BlockIdSet addAll(BlockIdSet blockIds) {
+    return addAll(blockIds.stream());
+  }
+
+  default BlockIdSet addAll(Stream<Long> blockIds) {
+    synchronized (this) {
+      blockIds.forEach(this::add);
+    }
+    return this;
+  }
+
+  default BlockIdSet addAll(LongStream blockIds) {
+    synchronized (this) {
+      blockIds.forEach(this::add);
+    }
+    return this;
+  }
+
+  BlockIdSet remove(long blockId);
+
+  default BlockIdSet removeAll(BlockIdSet blockIds) {
+    return removeAll(blockIds.stream());
+  }
+
+  default BlockIdSet removeAll(Stream<Long> blockIds) {
+    synchronized (this) {
+      blockIds.forEach(this::remove);
+    }
+    return this;
+  }
+
+  default BlockIdSet removeAll(LongStream blockIds) {
+    synchronized (this) {
+      blockIds.forEach(this::remove);
+    }
+    return this;
+  }
+
+  BlockIdSet retainAll(BlockIdSet blockIds);
+
+  boolean contains(long blockId);
+
+  boolean containsAll(BlockIdSet blockIds);
+
+  int getIntCardinality();
+
+  long getLongCardinality();
+
+  boolean isEmpty();
+
+  void forEach(LongConsumer func);
+
+  LongStream stream();
+
+  BlockIdSet copy();
+
+  byte[] serialize() throws IOException;
+
+  // create new empty instance using default implementation
+  static BlockIdSet empty() {
+    return new RoaringBitmapBlockIdSet();
+  }
+
+  // create new instance from given block ids using default implementation
+  static BlockIdSet of(long... blockIds) {
+    BlockIdSet set = empty();
+    set.addAll(Arrays.stream(blockIds));
+    return set;
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RoaringBitmapBlockIdSet.java b/common/src/main/java/org/apache/uniffle/common/util/RoaringBitmapBlockIdSet.java
new file mode 100644
index 0000000..309c655
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/RoaringBitmapBlockIdSet.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.io.IOException;
+import java.util.function.LongConsumer;
+import java.util.stream.LongStream;
+
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+public class RoaringBitmapBlockIdSet implements BlockIdSet {
+  public final Roaring64NavigableMap bitmap;
+
+  public RoaringBitmapBlockIdSet() {
+    this(Roaring64NavigableMap.bitmapOf());
+  }
+
+  public RoaringBitmapBlockIdSet(Roaring64NavigableMap bitmap) {
+    this.bitmap = bitmap;
+  }
+
+  @Override
+  public synchronized BlockIdSet add(long blockId) {
+    bitmap.addLong(blockId);
+    return this;
+  }
+
+  @Override
+  public synchronized BlockIdSet addAll(BlockIdSet blockIds) {
+    if (!(blockIds instanceof RoaringBitmapBlockIdSet)) {
+      throw new UnsupportedOperationException(
+          "Only supported for RoaringBitmapBlockIdSet arguments");
+    }
+    bitmap.or(((RoaringBitmapBlockIdSet) blockIds).bitmap);
+    return this;
+  }
+
+  @Override
+  public synchronized BlockIdSet remove(long blockId) {
+    bitmap.removeLong(blockId);
+    return this;
+  }
+
+  @Override
+  public synchronized BlockIdSet removeAll(BlockIdSet blockIds) {
+    if (!(blockIds instanceof RoaringBitmapBlockIdSet)) {
+      throw new UnsupportedOperationException(
+          "Only supported for RoaringBitmapBlockIdSet arguments");
+    }
+    bitmap.andNot(((RoaringBitmapBlockIdSet) blockIds).bitmap);
+    return this;
+  }
+
+  @Override
+  public synchronized BlockIdSet retainAll(BlockIdSet blockIds) {
+    if (!(blockIds instanceof RoaringBitmapBlockIdSet)) {
+      throw new UnsupportedOperationException(
+          "Only supported for RoaringBitmapBlockIdSet arguments");
+    }
+    bitmap.and(((RoaringBitmapBlockIdSet) blockIds).bitmap);
+    return this;
+  }
+
+  @Override
+  public boolean contains(long blockId) {
+    return bitmap.contains(blockId);
+  }
+
+  @Override
+  public boolean containsAll(BlockIdSet blockIds) {
+    if (!(blockIds instanceof RoaringBitmapBlockIdSet)) {
+      throw new UnsupportedOperationException(
+          "Only supported for RoaringBitmapBlockIdSet arguments");
+    }
+    Roaring64NavigableMap expecteds = ((RoaringBitmapBlockIdSet) blockIds).bitmap;
+
+    // first a quick check: no need for expensive bitwise AND when this is equal to the other BlockIdSet
+    if (this.bitmap.equals(expecteds)) {
+      return true;
+    }
+
+    // bitmaps are not equal, check if all expected bits are contained
+    Roaring64NavigableMap actuals = RssUtils.cloneBitMap(bitmap);
+    actuals.and(expecteds);
+    return actuals.equals(expecteds);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof BlockIdSet)) {
+      return false;
+    }
+
+    BlockIdSet blockIds = (BlockIdSet) other;
+    if (!(blockIds instanceof RoaringBitmapBlockIdSet)) {
+      throw new UnsupportedOperationException(
+          "Only supported for RoaringBitmapBlockIdSet arguments");
+    }
+
+    return bitmap.equals(((RoaringBitmapBlockIdSet) blockIds).bitmap);
+  }
+
+  @Override
+  public int getIntCardinality() {
+    return bitmap.getIntCardinality();
+  }
+
+  @Override
+  public long getLongCardinality() {
+    return bitmap.getLongCardinality();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return bitmap.isEmpty();
+  }
+
+  @Override
+  public synchronized void forEach(LongConsumer func) {
+    bitmap.forEach(func::accept);
+  }
+
+  @Override
+  public LongStream stream() {
+    return bitmap.stream();
+  }
+
+  @Override
+  public synchronized BlockIdSet copy() {
+    return new RoaringBitmapBlockIdSet(RssUtils.cloneBitMap(bitmap));
+  }
+
+  @Override
+  public byte[] serialize() throws IOException {
+    return RssUtils.serializeBitMap(bitmap);
+  }
+
+  @Override
+  public String toString() {
+    return bitmap.toString();
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 5c65f5e..e652e84 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -336,16 +336,13 @@
     return hostName.replaceAll("[\\.-]", "_");
   }
 
-  public static Map<Integer, Roaring64NavigableMap> generatePartitionToBitmap(
-      Roaring64NavigableMap shuffleBitmap,
-      int startPartition,
-      int endPartition,
-      BlockIdLayout layout) {
-    Map<Integer, Roaring64NavigableMap> result = Maps.newHashMap();
+  public static Map<Integer, BlockIdSet> generatePartitionToBitmap(
+      BlockIdSet shuffleBitmap, int startPartition, int endPartition, BlockIdLayout layout) {
+    Map<Integer, BlockIdSet> result = Maps.newHashMap();
     for (int partitionId = startPartition; partitionId < endPartition; partitionId++) {
-      result.computeIfAbsent(partitionId, key -> Roaring64NavigableMap.bitmapOf());
+      result.computeIfAbsent(partitionId, key -> BlockIdSet.empty());
     }
-    Iterator<Long> it = shuffleBitmap.iterator();
+    Iterator<Long> it = shuffleBitmap.stream().iterator();
     while (it.hasNext()) {
       Long blockId = it.next();
       int partitionId = layout.getPartitionId(blockId);
@@ -373,33 +370,23 @@
   }
 
   public static void checkProcessedBlockIds(
-      Roaring64NavigableMap blockIdBitmap, Roaring64NavigableMap processedBlockIds) {
+      BlockIdSet blockIdBitmap, BlockIdSet processedBlockIds) {
     // processedBlockIds can be a superset of blockIdBitmap,
-    // here we check that processedBlockIds has all bits of blockIdBitmap set
-    // first a quick check:
-    //   we only need to do the bitwise AND when blockIdBitmap is not equal to processedBlockIds
-    if (!blockIdBitmap.equals(processedBlockIds)) {
-      Roaring64NavigableMap cloneBitmap;
-      cloneBitmap = RssUtils.cloneBitMap(blockIdBitmap);
-      cloneBitmap.and(processedBlockIds);
-      if (!blockIdBitmap.equals(cloneBitmap)) {
-        throw new RssException(
-            "Blocks read inconsistent: expected "
-                + blockIdBitmap.getLongCardinality()
-                + " blocks, actual "
-                + cloneBitmap.getLongCardinality()
-                + " blocks");
-      }
+    // here we check that processedBlockIds contains all of blockIdBitmap
+    if (!processedBlockIds.containsAll(blockIdBitmap)) {
+      throw new RssException(
+          "Blocks read inconsistent: expected "
+              + blockIdBitmap.getLongCardinality()
+              + " blocks, actual "
+              + processedBlockIds.copy().retainAll(blockIdBitmap).getLongCardinality()
+              + " blocks");
     }
   }
 
   public static Roaring64NavigableMap generateTaskIdBitMap(
-      Roaring64NavigableMap blockIdBitmap, IdHelper idHelper) {
-    Iterator<Long> iterator = blockIdBitmap.iterator();
+      BlockIdSet blockIdBitmap, IdHelper idHelper) {
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
-    while (iterator.hasNext()) {
-      taskIdBitmap.addLong(idHelper.getTaskAttemptId(iterator.next()));
-    }
+    blockIdBitmap.forEach(blockId -> taskIdBitmap.addLong(idHelper.getTaskAttemptId(blockId)));
     return taskIdBitmap;
   }
 
diff --git a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index d9bfddb..2c6a828 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -237,23 +237,23 @@
   @ParameterizedTest
   @MethodSource("testBlockIdLayouts")
   public void testShuffleBitmapToPartitionBitmap(BlockIdLayout layout) {
-    Roaring64NavigableMap partition1Bitmap =
-        Roaring64NavigableMap.bitmapOf(
+    BlockIdSet partition1Bitmap =
+        BlockIdSet.of(
             layout.getBlockId(0, 0, 0),
             layout.getBlockId(1, 0, 0),
             layout.getBlockId(0, 0, 1),
             layout.getBlockId(1, 0, 1));
-    Roaring64NavigableMap partition2Bitmap =
-        Roaring64NavigableMap.bitmapOf(
+    BlockIdSet partition2Bitmap =
+        BlockIdSet.of(
             layout.getBlockId(0, 1, 0),
             layout.getBlockId(1, 1, 0),
             layout.getBlockId(0, 1, 1),
             layout.getBlockId(1, 1, 1));
-    Roaring64NavigableMap shuffleBitmap = Roaring64NavigableMap.bitmapOf();
-    shuffleBitmap.or(partition1Bitmap);
-    shuffleBitmap.or(partition2Bitmap);
+    BlockIdSet shuffleBitmap = BlockIdSet.empty();
+    shuffleBitmap.addAll(partition1Bitmap);
+    shuffleBitmap.addAll(partition2Bitmap);
     assertEquals(8, shuffleBitmap.getLongCardinality());
-    Map<Integer, Roaring64NavigableMap> toPartitionBitmap =
+    Map<Integer, BlockIdSet> toPartitionBitmap =
         RssUtils.generatePartitionToBitmap(shuffleBitmap, 0, 2, layout);
     assertEquals(2, toPartitionBitmap.size());
     assertEquals(partition1Bitmap, toPartitionBitmap.get(0));
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
index 4029d1c..40ee1a2 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
@@ -53,6 +53,7 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.CoordinatorServer;
 import org.apache.uniffle.server.ShuffleServer;
@@ -166,7 +167,7 @@
     String appId = "ap_disk_error_data";
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Set<Long> expectedBlock1 = Sets.newHashSet();
-    Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap1 = BlockIdSet.empty();
     List<ShuffleBlockInfo> blocks1 =
         createShuffleBlockList(0, 0, 1, 3, 25, blockIdBitmap1, expectedData);
     RssRegisterShuffleRequest rr1 =
@@ -212,7 +213,7 @@
     expectedData.clear();
     partitionToBlocks.clear();
     shuffleToBlocks.clear();
-    Roaring64NavigableMap blockIdBitmap2 = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap2 = BlockIdSet.empty();
     Set<Long> expectedBlock2 = Sets.newHashSet();
     List<ShuffleBlockInfo> blocks2 =
         createShuffleBlockList(0, 0, 2, 5, 30, blockIdBitmap2, expectedData);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
index 8a2fe26..c26f015 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
@@ -51,6 +51,7 @@
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.storage.util.StorageType;
 
@@ -97,7 +98,7 @@
     Map<Integer, List<Integer>> map = Maps.newHashMap();
     map.put(0, Lists.newArrayList(0));
     registerShuffle(appId, map, isNettyMode);
-    Roaring64NavigableMap blockBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockBitmap = BlockIdSet.empty();
     final List<ShuffleBlockInfo> blocks =
         createShuffleBlockList(0, 0, 0, 40, 2 * 1024 * 1024, blockBitmap, expectedData);
     makeChaos();
@@ -156,7 +157,7 @@
       String appId,
       int shuffleId,
       int partitionId,
-      Roaring64NavigableMap blockBitmap,
+      BlockIdSet blockBitmap,
       Roaring64NavigableMap taskBitmap,
       Map<Long, byte[]> expectedData,
       boolean isNettyMode) {
@@ -186,11 +187,11 @@
             .hadoopConf(conf)
             .build();
     CompressedShuffleBlock csb = readClient.readShuffleBlockData();
-    Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet matched = BlockIdSet.empty();
     while (csb != null && csb.getByteBuffer() != null) {
       for (Map.Entry<Long, byte[]> entry : expectedData.entrySet()) {
         if (compareByte(entry.getValue(), csb.getByteBuffer())) {
-          matched.addLong(entry.getKey());
+          matched.add(entry.getKey());
           break;
         }
       }
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 6662a36..bac3d60 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -45,6 +45,7 @@
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.CoordinatorServer;
@@ -249,7 +250,7 @@
     String testAppId = "rpcFailedTest";
     registerShuffleServer(testAppId, 3, 2, 2, true);
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
 
     // case1: When only 1 server is failed, the block sending should success
     List<ShuffleBlockInfo> blocks =
@@ -264,13 +265,13 @@
             Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, fakedShuffleServerInfo2));
 
     SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
-    Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet failedBlockIdBitmap = BlockIdSet.empty();
+    BlockIdSet succBlockIdBitmap = BlockIdSet.empty();
     for (Long blockId : result.getSuccessBlockIds()) {
-      succBlockIdBitmap.addLong(blockId);
+      succBlockIdBitmap.add(blockId);
     }
     for (Long blockId : result.getFailedBlockIds()) {
-      failedBlockIdBitmap.addLong(blockId);
+      failedBlockIdBitmap.add(blockId);
     }
     assertEquals(0, failedBlockIdBitmap.getLongCardinality());
     assertEquals(blockIdBitmap, succBlockIdBitmap);
@@ -289,7 +290,7 @@
     validateResult(readClient, expectedData);
 
     // case2: When 2 servers are failed, the block sending should fail
-    blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    blockIdBitmap = BlockIdSet.empty();
     blocks =
         createShuffleBlockList(
             0,
@@ -302,13 +303,13 @@
             Lists.newArrayList(
                 shuffleServerInfo0, fakedShuffleServerInfo1, fakedShuffleServerInfo2));
     result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
-    failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
-    succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    failedBlockIdBitmap = BlockIdSet.empty();
+    succBlockIdBitmap = BlockIdSet.empty();
     for (Long blockId : result.getSuccessBlockIds()) {
-      succBlockIdBitmap.addLong(blockId);
+      succBlockIdBitmap.add(blockId);
     }
     for (Long blockId : result.getFailedBlockIds()) {
-      failedBlockIdBitmap.addLong(blockId);
+      failedBlockIdBitmap.add(blockId);
     }
     assertEquals(blockIdBitmap, failedBlockIdBitmap);
     assertEquals(0, succBlockIdBitmap.getLongCardinality());
@@ -379,7 +380,7 @@
   public void case1() throws Exception {
     String testAppId = "case1";
     registerShuffleServer(testAppId, 3, 2, 2, true);
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
 
     // only 1 server is timout, the block sending should success
     enableTimeout((MockedShuffleServer) grpcShuffleServers.get(2), 500);
@@ -392,7 +393,7 @@
     serverToPartitionToBlockIds.put(shuffleServerInfo1, partitionToBlockIds);
     serverToPartitionToBlockIds.put(shuffleServerInfo2, partitionToBlockIds);
     shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 0, 0L, 1);
-    Roaring64NavigableMap report =
+    BlockIdSet report =
         shuffleWriteClientImpl.getShuffleResult(
             "GRPC",
             Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
@@ -414,9 +415,9 @@
             expectedData,
             Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
     SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
-    Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet succBlockIdBitmap = BlockIdSet.empty();
     for (Long blockId : result.getSuccessBlockIds()) {
-      succBlockIdBitmap.addLong(blockId);
+      succBlockIdBitmap.add(blockId);
     }
     assertEquals(0, result.getFailedBlockIds().size());
     assertEquals(blockIdBitmap, succBlockIdBitmap);
@@ -439,7 +440,7 @@
     registerShuffleServer(testAppId, 3, 2, 2, true);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     // When 2 servers are timeout, the block sending should fail
     enableTimeout((MockedShuffleServer) grpcShuffleServers.get(1), 500);
@@ -455,9 +456,9 @@
             expectedData,
             Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
     SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
-    Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet failedBlockIdBitmap = BlockIdSet.empty();
     for (Long blockId : result.getFailedBlockIds()) {
-      failedBlockIdBitmap.addLong(blockId);
+      failedBlockIdBitmap.add(blockId);
     }
     assertEquals(blockIdBitmap, failedBlockIdBitmap);
     assertEquals(0, result.getSuccessBlockIds().size());
@@ -501,7 +502,7 @@
     enableTimeout((MockedShuffleServer) grpcShuffleServers.get(2), 500);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     List<ShuffleBlockInfo> blocks =
         createShuffleBlockList(
             0,
@@ -513,13 +514,13 @@
             expectedData,
             Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
     SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
-    Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet failedBlockIdBitmap = BlockIdSet.empty();
+    BlockIdSet succBlockIdBitmap = BlockIdSet.empty();
     for (Long blockId : result.getSuccessBlockIds()) {
-      succBlockIdBitmap.addLong(blockId);
+      succBlockIdBitmap.add(blockId);
     }
     for (Long blockId : result.getFailedBlockIds()) {
-      failedBlockIdBitmap.addLong(blockId);
+      failedBlockIdBitmap.add(blockId);
     }
     assertEquals(blockIdBitmap, succBlockIdBitmap);
     assertEquals(0, failedBlockIdBitmap.getLongCardinality());
@@ -533,7 +534,7 @@
 
     shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 0, 0L, 1);
 
-    Roaring64NavigableMap report =
+    BlockIdSet report =
         shuffleWriteClientImpl.getShuffleResult(
             "GRPC",
             Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
@@ -577,7 +578,7 @@
     enableTimeout((MockedShuffleServer) grpcShuffleServers.get(2), 500);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     for (int i = 0; i < 5; i++) {
       List<ShuffleBlockInfo> blocks =
@@ -613,7 +614,7 @@
     registerShuffleServer(testAppId, 3, 2, 2, true);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     final List<ShuffleBlockInfo> blocks =
         createShuffleBlockList(
             0,
@@ -633,7 +634,7 @@
     serverToPartitionToBlockIds.put(shuffleServerInfo1, partitionToBlockIds);
     serverToPartitionToBlockIds.put(shuffleServerInfo2, partitionToBlockIds);
     shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 0, 0L, 1);
-    Roaring64NavigableMap report =
+    BlockIdSet report =
         shuffleWriteClientImpl.getShuffleResult(
             "GRPC",
             Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
@@ -644,9 +645,9 @@
 
     // data read should success
     SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
-    Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet succBlockIdBitmap = BlockIdSet.empty();
     for (Long blockId : result.getSuccessBlockIds()) {
-      succBlockIdBitmap.addLong(blockId);
+      succBlockIdBitmap.add(blockId);
     }
     assertEquals(0, result.getFailedBlockIds().size());
     assertEquals(blockIdBitmap, succBlockIdBitmap);
@@ -687,9 +688,9 @@
     String testAppId = "case6";
     registerShuffleServer(testAppId, 3, 2, 2, true);
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap0 = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap blockIdBitmap2 = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap0 = BlockIdSet.empty();
+    BlockIdSet blockIdBitmap1 = BlockIdSet.empty();
+    BlockIdSet blockIdBitmap2 = BlockIdSet.empty();
 
     List<ShuffleBlockInfo> partition0 =
         createShuffleBlockList(
@@ -753,7 +754,7 @@
     registerShuffleServer(testAppId, 3, 2, 2, true);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
 
     // attempt to send data to "all servers", but only the server 0,1 receive data actually
@@ -811,7 +812,7 @@
     registerShuffleServer(testAppId, 3, 2, 2, true);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
 
     // attempt to send data to "all servers", but only the server 0,2 receive data actually
@@ -872,7 +873,7 @@
     registerShuffleServer(testAppId, 5, 3, 3, true);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
 
     // attempt to send data to "all servers", but only the server 0,1,2 receive data actually
@@ -926,7 +927,7 @@
     registerShuffleServer(testAppId, 5, 3, 3, true);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
 
     // attempt to send data to "all servers", but the secondary round is activated due to failures
@@ -980,7 +981,7 @@
     registerShuffleServer(testAppId, 5, 4, 2, true);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
 
     // attempt to send data to "all servers", but only the server 0,1,2 receive data actually
@@ -1035,7 +1036,7 @@
     registerShuffleServer(testAppId, 3, 2, 2, false);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
 
     for (int i = 0; i < 5; i++) {
@@ -1090,11 +1091,11 @@
       Map<Long, byte[]> expectedData,
       Roaring64NavigableMap blockIdBitmap) {
     CompressedShuffleBlock csb = readClient.readShuffleBlockData();
-    Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet matched = BlockIdSet.empty();
     while (csb != null && csb.getByteBuffer() != null) {
       for (Map.Entry<Long, byte[]> entry : expectedData.entrySet()) {
         if (compareByte(entry.getValue(), csb.getByteBuffer())) {
-          matched.addLong(entry.getKey());
+          matched.add(entry.getKey());
           break;
         }
       }
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
index d082ae0..f00102a 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
@@ -43,6 +43,7 @@
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.CoordinatorServer;
 import org.apache.uniffle.server.MockedGrpcServer;
@@ -152,7 +153,7 @@
     String testAppId = "testRpcRetryLogic";
     registerShuffleServer(testAppId, 3, 2, 2, true);
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
 
     List<ShuffleBlockInfo> blocks =
         createShuffleBlockList(
@@ -166,13 +167,13 @@
             Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
 
     SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
-    Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap successfulBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet failedBlockIdBitmap = BlockIdSet.empty();
+    BlockIdSet successfulBlockIdBitmap = BlockIdSet.empty();
     for (Long blockId : result.getSuccessBlockIds()) {
-      successfulBlockIdBitmap.addLong(blockId);
+      successfulBlockIdBitmap.add(blockId);
     }
     for (Long blockId : result.getFailedBlockIds()) {
-      failedBlockIdBitmap.addLong(blockId);
+      failedBlockIdBitmap.add(blockId);
     }
     assertEquals(0, failedBlockIdBitmap.getLongCardinality());
     assertEquals(blockIdBitmap, successfulBlockIdBitmap);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
index b53df0d..6ce9b56 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
@@ -26,7 +26,6 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.TestUtils;
 import org.apache.uniffle.client.api.ShuffleReadClient;
@@ -41,6 +40,7 @@
 import org.apache.uniffle.common.segment.FixedSizeSegmentSplitter;
 import org.apache.uniffle.common.segment.SegmentSplitter;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.ChecksumUtils;
 
 public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
@@ -55,7 +55,7 @@
       long taskAttemptId,
       int blockNum,
       int length,
-      Roaring64NavigableMap blockIdBitmap,
+      BlockIdSet blockIdBitmap,
       Map<Long, byte[]> dataMap,
       List<ShuffleServerInfo> shuffleServerInfoList) {
     List<ShuffleBlockInfo> shuffleBlockInfoList = Lists.newArrayList();
@@ -65,7 +65,7 @@
       int seqno = ATOMIC_INT.getAndIncrement();
 
       long blockId = LAYOUT.getBlockId(seqno, 0, taskAttemptId);
-      blockIdBitmap.addLong(blockId);
+      blockIdBitmap.add(blockId);
       dataMap.put(blockId, buf);
       shuffleBlockInfoList.add(
           new ShuffleBlockInfo(
@@ -89,7 +89,7 @@
       long taskAttemptId,
       int blockNum,
       int length,
-      Roaring64NavigableMap blockIdBitmap,
+      BlockIdSet blockIdBitmap,
       Map<Long, byte[]> dataMap) {
     List<ShuffleServerInfo> shuffleServerInfoList =
         Lists.newArrayList(new ShuffleServerInfo("id", "host", 0));
@@ -105,9 +105,9 @@
   }
 
   public static Map<Integer, List<ShuffleBlockInfo>> createTestData(
-      Roaring64NavigableMap[] bitmaps, Map<Long, byte[]> expectedData) {
+      BlockIdSet[] bitmaps, Map<Long, byte[]> expectedData) {
     for (int i = 0; i < 4; i++) {
-      bitmaps[i] = Roaring64NavigableMap.bitmapOf();
+      bitmaps[i] = BlockIdSet.empty();
     }
     List<ShuffleBlockInfo> blocks1 =
         createShuffleBlockList(0, 0, 0, 3, 25, bitmaps[0], expectedData, mockSSI);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
index 7d5d114..cf487e1 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
@@ -54,6 +54,7 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.storage.util.StorageType;
@@ -120,12 +121,12 @@
             clientSpecifiedConcurrency);
     shuffleServerClient.registerShuffle(rrsr);
 
-    List<Roaring64NavigableMap> bitmaps = new ArrayList<>();
+    List<BlockIdSet> bitmaps = new ArrayList<>();
     Map<Long, byte[]> expectedDataList = new HashMap<>();
     IntStream.range(0, 20)
         .forEach(
             x -> {
-              Roaring64NavigableMap bitmap = Roaring64NavigableMap.bitmapOf();
+              BlockIdSet bitmap = BlockIdSet.empty();
               bitmaps.add(bitmap);
 
               Map<Long, byte[]> expectedData = Maps.newHashMap();
@@ -167,11 +168,11 @@
                 nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT))
             : new ShuffleServerInfo(
                 LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
-    Roaring64NavigableMap blocksBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blocksBitmap = BlockIdSet.empty();
     bitmaps.stream()
         .forEach(
             x -> {
-              Iterator<Long> iterator = x.iterator();
+              Iterator<Long> iterator = x.stream().iterator();
               while (iterator.hasNext()) {
                 blocksBitmap.add(iterator.next());
               }
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
index 2eee212..44a0f30 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
@@ -48,6 +48,7 @@
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.CoordinatorServer;
@@ -131,7 +132,7 @@
         (client) -> {
           client.registerShuffle(rrsr);
         });
-    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectBlockIds = BlockIdSet.empty();
     Map<Long, byte[]> dataMap = Maps.newHashMap();
     Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
     bitmaps[0] = Roaring64NavigableMap.bitmapOf();
@@ -254,7 +255,7 @@
       int shuffleId,
       int partitionId,
       List<ShuffleServerInfo> shuffleServerInfoList,
-      Roaring64NavigableMap expectBlockIds,
+      BlockIdSet expectBlockIds,
       StorageType storageType) {
     CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest();
     request.setStorageType(storageType.name());
@@ -269,7 +270,7 @@
     request.setShuffleServerInfoList(shuffleServerInfoList);
     request.setHadoopConf(conf);
     request.setExpectBlockIds(expectBlockIds);
-    Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet processBlockIds = BlockIdSet.empty();
     request.setProcessBlockIds(processBlockIds);
     request.setDistributionType(ShuffleDataDistributionType.NORMAL);
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 9b48662..d5070e8 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -72,6 +72,7 @@
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.proto.RssProtos;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
@@ -277,8 +278,8 @@
 
     req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 1, layout);
     RssGetShuffleResultResponse result = grpcShuffleServerClient.getShuffleResult(req);
-    Roaring64NavigableMap blockIdBitmap = result.getBlockIdBitmap();
-    assertEquals(Roaring64NavigableMap.bitmapOf(), blockIdBitmap);
+    BlockIdSet blockIdBitmap = result.getBlockIdBitmap();
+    assertEquals(BlockIdSet.empty(), blockIdBitmap);
 
     request = new RssReportShuffleResultRequest("shuffleResultTest", 0, 0L, partitionToBlockIds, 1);
     RssReportShuffleResultResponse response = grpcShuffleServerClient.reportShuffleResult(request);
@@ -286,21 +287,21 @@
     req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 1, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
-    Roaring64NavigableMap expectedP1 = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectedP1 = BlockIdSet.empty();
     addExpectedBlockIds(expectedP1, blockIds1);
     assertEquals(expectedP1, blockIdBitmap);
 
     req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 2, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
-    Roaring64NavigableMap expectedP2 = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectedP2 = BlockIdSet.empty();
     addExpectedBlockIds(expectedP2, blockIds2);
     assertEquals(expectedP2, blockIdBitmap);
 
     req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 3, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
-    Roaring64NavigableMap expectedP3 = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectedP3 = BlockIdSet.empty();
     addExpectedBlockIds(expectedP3, blockIds3);
     assertEquals(expectedP3, blockIdBitmap);
 
@@ -338,7 +339,7 @@
     req = new RssGetShuffleResultRequest("shuffleResultTest", 1, 1, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
-    assertEquals(Roaring64NavigableMap.bitmapOf(), blockIdBitmap);
+    assertEquals(BlockIdSet.empty(), blockIdBitmap);
 
     // test with bitmapNum > 1
     partitionToBlockIds = Maps.newHashMap();
@@ -351,7 +352,7 @@
     request = new RssReportShuffleResultRequest("shuffleResultTest", 2, 1L, partitionToBlockIds, 3);
     grpcShuffleServerClient.reportShuffleResult(request);
     // validate bitmap in shuffleTaskManager
-    Roaring64NavigableMap[] bitmaps =
+    BlockIdSet[] bitmaps =
         grpcShuffleServers
             .get(0)
             .getShuffleTaskManager()
@@ -363,21 +364,21 @@
     req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 1, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
-    expectedP1 = Roaring64NavigableMap.bitmapOf();
+    expectedP1 = BlockIdSet.empty();
     addExpectedBlockIds(expectedP1, blockIds1);
     assertEquals(expectedP1, blockIdBitmap);
 
     req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 2, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
-    expectedP2 = Roaring64NavigableMap.bitmapOf();
+    expectedP2 = BlockIdSet.empty();
     addExpectedBlockIds(expectedP2, blockIds2);
     assertEquals(expectedP2, blockIdBitmap);
 
     req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 3, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
-    expectedP3 = Roaring64NavigableMap.bitmapOf();
+    expectedP3 = BlockIdSet.empty();
     addExpectedBlockIds(expectedP3, blockIds3);
     assertEquals(expectedP3, blockIdBitmap);
 
@@ -395,21 +396,21 @@
     req = new RssGetShuffleResultRequest("shuffleResultTest", 4, layout.maxPartitionId, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
-    expectedP1 = Roaring64NavigableMap.bitmapOf();
+    expectedP1 = BlockIdSet.empty();
     addExpectedBlockIds(expectedP1, blockIds1);
     assertEquals(expectedP1, blockIdBitmap);
 
     req = new RssGetShuffleResultRequest("shuffleResultTest", 4, 2, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
-    expectedP2 = Roaring64NavigableMap.bitmapOf();
+    expectedP2 = BlockIdSet.empty();
     addExpectedBlockIds(expectedP2, blockIds2);
     assertEquals(expectedP2, blockIdBitmap);
 
     req = new RssGetShuffleResultRequest("shuffleResultTest", 4, 3, layout);
     result = grpcShuffleServerClient.getShuffleResult(req);
     blockIdBitmap = result.getBlockIdBitmap();
-    expectedP3 = Roaring64NavigableMap.bitmapOf();
+    expectedP3 = BlockIdSet.empty();
     addExpectedBlockIds(expectedP3, blockIds3);
     assertEquals(expectedP3, blockIdBitmap);
 
@@ -729,15 +730,15 @@
     t2.join();
     t3.join();
 
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     for (Long blockId : expectedBlockIds) {
-      blockIdBitmap.addLong(blockId);
+      blockIdBitmap.add(blockId);
     }
 
     RssGetShuffleResultRequest req =
         new RssGetShuffleResultRequest("multipleShuffleResultTest", 1, 1, layout);
     RssGetShuffleResultResponse result = grpcShuffleServerClient.getShuffleResult(req);
-    Roaring64NavigableMap actualBlockIdBitmap = result.getBlockIdBitmap();
+    BlockIdSet actualBlockIdBitmap = result.getBlockIdBitmap();
     assertEquals(blockIdBitmap, actualBlockIdBitmap);
   }
 
@@ -1064,9 +1065,9 @@
     return blockIds;
   }
 
-  private void addExpectedBlockIds(Roaring64NavigableMap bitmap, List<Long> blockIds) {
+  private void addExpectedBlockIds(BlockIdSet bitmap, List<Long> blockIds) {
     for (int i = 0; i < blockIds.size(); i++) {
-      bitmap.addLong(blockIds.get(i));
+      bitmap.add(blockIds.get(i));
     }
   }
 }
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
index 67a9dc0..bf3aeb9 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
@@ -48,6 +48,7 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
@@ -137,7 +138,7 @@
             appId, 0, Lists.newArrayList(new PartitionRange(2, 3)), dataBasePath);
     shuffleServerClient.registerShuffle(rrsr);
 
-    Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[4];
+    BlockIdSet[] bitmaps = new BlockIdSet[4];
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Map<Integer, List<ShuffleBlockInfo>> dataBlocks = createTestData(bitmaps, expectedData);
     Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
@@ -248,15 +249,13 @@
   }
 
   protected void validateResult(
-      ShuffleReadClientImpl readClient,
-      Map<Long, byte[]> expectedData,
-      Roaring64NavigableMap blockIdBitmap) {
+      ShuffleReadClientImpl readClient, Map<Long, byte[]> expectedData, BlockIdSet blockIdBitmap) {
     CompressedShuffleBlock csb = readClient.readShuffleBlockData();
-    Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet matched = BlockIdSet.empty();
     while (csb != null && csb.getByteBuffer() != null) {
       for (Entry<Long, byte[]> entry : expectedData.entrySet()) {
         if (compareByte(entry.getValue(), csb.getByteBuffer())) {
-          matched.addLong(entry.getKey());
+          matched.add(entry.getKey());
           break;
         }
       }
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
index 77330fe..3a72068 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
@@ -56,6 +56,7 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.CoordinatorServer;
@@ -191,9 +192,9 @@
   }
 
   private Map<Integer, List<ShuffleBlockInfo>> createTestData(
-      Roaring64NavigableMap[] bitmaps, Map<Long, byte[]> expectedData) {
+      BlockIdSet[] bitmaps, Map<Long, byte[]> expectedData) {
     for (int i = 0; i < 4; i++) {
-      bitmaps[i] = Roaring64NavigableMap.bitmapOf();
+      bitmaps[i] = BlockIdSet.empty();
     }
     List<ShuffleBlockInfo> blocks1 =
         ShuffleReadWriteBase.createShuffleBlockList(
@@ -265,7 +266,7 @@
             ShuffleDataDistributionType.NORMAL);
     shuffleServerClient.registerShuffle(rrsr);
 
-    Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[4];
+    BlockIdSet[] bitmaps = new BlockIdSet[4];
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     Map<Integer, List<ShuffleBlockInfo>> dataBlocks = createTestData(bitmaps, expectedData);
     Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
@@ -376,15 +377,13 @@
   }
 
   protected void validateResult(
-      ShuffleReadClientImpl readClient,
-      Map<Long, byte[]> expectedData,
-      Roaring64NavigableMap blockIdBitmap) {
+      ShuffleReadClientImpl readClient, Map<Long, byte[]> expectedData, BlockIdSet blockIdBitmap) {
     CompressedShuffleBlock csb = readClient.readShuffleBlockData();
-    Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet matched = BlockIdSet.empty();
     while (csb != null && csb.getByteBuffer() != null) {
       for (Map.Entry<Long, byte[]> entry : expectedData.entrySet()) {
         if (TestUtils.compareByte(entry.getValue(), csb.getByteBuffer())) {
-          matched.addLong(entry.getKey());
+          matched.add(entry.getKey());
           break;
         }
       }
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
index b0ed66d..2a83538 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
@@ -56,6 +56,7 @@
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.segment.LocalOrderSegmentSplitter;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServerConf;
@@ -127,9 +128,9 @@
   }
 
   public static Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> createTestDataWithMultiMapIdx(
-      Roaring64NavigableMap[] bitmaps, Map<Long, byte[]> expectedData) {
+      BlockIdSet[] bitmaps, Map<Long, byte[]> expectedData) {
     for (int i = 0; i < 4; i++) {
-      bitmaps[i] = Roaring64NavigableMap.bitmapOf();
+      bitmaps[i] = BlockIdSet.empty();
     }
 
     // key: mapIdx
@@ -192,7 +193,7 @@
 
     /** Write the data to shuffle-servers */
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap[] bitMaps = new Roaring64NavigableMap[4];
+    BlockIdSet[] bitMaps = new BlockIdSet[4];
 
     // Create the shuffle block with the mapIdx
     Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> partitionToBlocksWithMapIdx =
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
index a77472e..3990ee3 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
@@ -19,6 +19,7 @@
 
 import java.io.File;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -34,8 +35,6 @@
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
-import org.roaringbitmap.longlong.LongIterator;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
 import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient;
@@ -50,6 +49,7 @@
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServer;
@@ -149,7 +149,7 @@
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
 
-    Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[4];
+    BlockIdSet[] bitmaps = new BlockIdSet[4];
     Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = createTestData(bitmaps, expectedData);
 
     Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
@@ -207,9 +207,9 @@
     assertEquals(expectedBlockIds.size(), matched);
   }
 
-  private Set<Long> transBitmapToSet(Roaring64NavigableMap blockIdBitmap) {
+  private Set<Long> transBitmapToSet(BlockIdSet blockIdBitmap) {
     Set<Long> blockIds = Sets.newHashSet();
-    LongIterator iter = blockIdBitmap.getLongIterator();
+    Iterator<Long> iter = blockIdBitmap.stream().iterator();
     while (iter.hasNext()) {
       blockIds.add(iter.next());
     }
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java
index f969397..f29deae 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java
@@ -49,6 +49,7 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServer;
@@ -166,7 +167,7 @@
             Lists.newArrayList(new PartitionRange(0, 0)),
             String.format(REMOTE_STORAGE, isNettyMode));
     shuffleServerClient.registerShuffle(rrsr);
-    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectBlockIds = BlockIdSet.empty();
     Map<Long, byte[]> dataMap = Maps.newHashMap();
     Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
     bitmaps[0] = Roaring64NavigableMap.bitmapOf();
@@ -184,7 +185,7 @@
     RssSendShuffleDataResponse response = shuffleServerClient.sendShuffleData(rssdr);
     assertSame(StatusCode.SUCCESS, response.getStatusCode());
 
-    Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet processBlockIds = BlockIdSet.empty();
     Roaring64NavigableMap exceptTaskIds = Roaring64NavigableMap.bitmapOf(0);
     // read the 1-th segment from memory
     MemoryClientReadHandler memoryClientReadHandler =
@@ -236,9 +237,9 @@
     expectedData.put(blocks.get(2).getBlockId(), ByteBufUtils.readBytes(blocks.get(2).getData()));
     ShuffleDataResult sdr = composedClientReadHandler.readShuffleData();
     validateResult(expectedData, sdr);
-    processBlockIds.addLong(blocks.get(0).getBlockId());
-    processBlockIds.addLong(blocks.get(1).getBlockId());
-    processBlockIds.addLong(blocks.get(2).getBlockId());
+    processBlockIds.add(blocks.get(0).getBlockId());
+    processBlockIds.add(blocks.get(1).getBlockId());
+    processBlockIds.add(blocks.get(2).getBlockId());
     sdr.getBufferSegments()
         .forEach(bs -> composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
 
@@ -261,8 +262,8 @@
     expectedData.put(blocks2.get(0).getBlockId(), ByteBufUtils.readBytes(blocks2.get(0).getData()));
     expectedData.put(blocks2.get(1).getBlockId(), ByteBufUtils.readBytes(blocks2.get(1).getData()));
     validateResult(expectedData, sdr);
-    processBlockIds.addLong(blocks2.get(0).getBlockId());
-    processBlockIds.addLong(blocks2.get(1).getBlockId());
+    processBlockIds.add(blocks2.get(0).getBlockId());
+    processBlockIds.add(blocks2.get(1).getBlockId());
     sdr.getBufferSegments()
         .forEach(bs -> composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
 
@@ -271,7 +272,7 @@
     expectedData.clear();
     expectedData.put(blocks2.get(2).getBlockId(), ByteBufUtils.readBytes(blocks2.get(2).getData()));
     validateResult(expectedData, sdr);
-    processBlockIds.addLong(blocks2.get(2).getBlockId());
+    processBlockIds.add(blocks2.get(2).getBlockId());
     sdr.getBufferSegments()
         .forEach(bs -> composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
 
@@ -293,8 +294,8 @@
     expectedData.put(blocks3.get(0).getBlockId(), ByteBufUtils.readBytes(blocks3.get(0).getData()));
     expectedData.put(blocks3.get(1).getBlockId(), ByteBufUtils.readBytes(blocks3.get(1).getData()));
     validateResult(expectedData, sdr);
-    processBlockIds.addLong(blocks3.get(0).getBlockId());
-    processBlockIds.addLong(blocks3.get(1).getBlockId());
+    processBlockIds.add(blocks3.get(0).getBlockId());
+    processBlockIds.add(blocks3.get(1).getBlockId());
     sdr.getBufferSegments()
         .forEach(bs -> composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
 
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
index a2de8f7..a8a2834 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
@@ -47,6 +47,7 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServer;
@@ -143,10 +144,10 @@
         new RssRegisterShuffleRequest(
             testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), "");
     shuffleServerClient.registerShuffle(rrsr);
-    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectBlockIds = BlockIdSet.empty();
     Map<Long, byte[]> dataMap = Maps.newHashMap();
-    Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
-    bitmaps[0] = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet[] bitmaps = new BlockIdSet[1];
+    bitmaps[0] = BlockIdSet.empty();
     List<ShuffleBlockInfo> blocks =
         createShuffleBlockList(shuffleId, partitionId, 0, 3, 25, expectBlockIds, dataMap, mockSSI);
     Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
@@ -201,7 +202,7 @@
         new MemoryClientReadHandler(
             testAppId, shuffleId, partitionId, 50, shuffleServerClient, exceptTaskIds);
 
-    Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet processBlockIds = BlockIdSet.empty();
     LocalFileClientReadHandler localFileQuorumClientReadHandler =
         new LocalFileClientReadHandler(
             testAppId,
@@ -235,8 +236,8 @@
     validateResult(expectedData, sdr);
 
     // send data to shuffle server, flush should happen
-    processBlockIds.addLong(blocks.get(0).getBlockId());
-    processBlockIds.addLong(blocks.get(1).getBlockId());
+    processBlockIds.add(blocks.get(0).getBlockId());
+    processBlockIds.add(blocks.get(1).getBlockId());
 
     List<ShuffleBlockInfo> blocks2 =
         createShuffleBlockList(shuffleId, partitionId, 0, 3, 50, expectBlockIds, dataMap, mockSSI);
@@ -269,20 +270,20 @@
     expectedData.put(blocks.get(2).getBlockId(), ByteBufUtils.readBytes(blocks.get(2).getData()));
     expectedData.put(blocks2.get(0).getBlockId(), ByteBufUtils.readBytes(blocks2.get(0).getData()));
     validateResult(expectedData, sdr);
-    processBlockIds.addLong(blocks.get(2).getBlockId());
-    processBlockIds.addLong(blocks2.get(0).getBlockId());
+    processBlockIds.add(blocks.get(2).getBlockId());
+    processBlockIds.add(blocks2.get(0).getBlockId());
 
     sdr = composedClientReadHandler.readShuffleData();
     expectedData.clear();
     expectedData.put(blocks2.get(1).getBlockId(), ByteBufUtils.readBytes(blocks2.get(1).getData()));
     validateResult(expectedData, sdr);
-    processBlockIds.addLong(blocks2.get(1).getBlockId());
+    processBlockIds.add(blocks2.get(1).getBlockId());
 
     sdr = composedClientReadHandler.readShuffleData();
     expectedData.clear();
     expectedData.put(blocks2.get(2).getBlockId(), ByteBufUtils.readBytes(blocks2.get(2).getData()));
     validateResult(expectedData, sdr);
-    processBlockIds.addLong(blocks2.get(2).getBlockId());
+    processBlockIds.add(blocks2.get(2).getBlockId());
 
     sdr = composedClientReadHandler.readShuffleData();
     assertNull(sdr);
@@ -304,10 +305,10 @@
         new RssRegisterShuffleRequest(
             testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), "");
     shuffleServerClient.registerShuffle(rrsr);
-    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectBlockIds = BlockIdSet.empty();
     Map<Long, byte[]> dataMap = Maps.newHashMap();
-    Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
-    bitmaps[0] = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet[] bitmaps = new BlockIdSet[1];
+    bitmaps[0] = BlockIdSet.empty();
     // create blocks which belong to different tasks
     List<ShuffleBlockInfo> blocks = Lists.newArrayList();
     for (int i = 0; i < 3; i++) {
@@ -389,10 +390,10 @@
         new RssRegisterShuffleRequest(
             testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), "");
     shuffleServerClient.registerShuffle(rrsr);
-    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectBlockIds = BlockIdSet.empty();
     Map<Long, byte[]> dataMap = Maps.newHashMap();
-    Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
-    bitmaps[0] = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet[] bitmaps = new BlockIdSet[1];
+    bitmaps[0] = BlockIdSet.empty();
     List<ShuffleBlockInfo> blocks =
         createShuffleBlockList(shuffleId, partitionId, 0, 3, 25, expectBlockIds, dataMap, mockSSI);
     Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
@@ -407,7 +408,7 @@
     assertSame(StatusCode.SUCCESS, response.getStatusCode());
 
     // read the 1-th segment from memory
-    Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet processBlockIds = BlockIdSet.empty();
     Roaring64NavigableMap exceptTaskIds = Roaring64NavigableMap.bitmapOf(0);
     MemoryClientReadHandler memoryClientReadHandler =
         new MemoryClientReadHandler(
@@ -444,9 +445,9 @@
     expectedData.put(blocks.get(2).getBlockId(), ByteBufUtils.readBytes(blocks.get(2).getData()));
     ShuffleDataResult sdr = composedClientReadHandler.readShuffleData();
     validateResult(expectedData, sdr);
-    processBlockIds.addLong(blocks.get(0).getBlockId());
-    processBlockIds.addLong(blocks.get(1).getBlockId());
-    processBlockIds.addLong(blocks.get(2).getBlockId());
+    processBlockIds.add(blocks.get(0).getBlockId());
+    processBlockIds.add(blocks.get(1).getBlockId());
+    processBlockIds.add(blocks.get(2).getBlockId());
 
     // send data to shuffle server, and wait until flush finish
     List<ShuffleBlockInfo> blocks2 =
@@ -480,15 +481,15 @@
     expectedData.put(blocks2.get(0).getBlockId(), ByteBufUtils.readBytes(blocks2.get(0).getData()));
     expectedData.put(blocks2.get(1).getBlockId(), ByteBufUtils.readBytes(blocks2.get(1).getData()));
     validateResult(expectedData, sdr);
-    processBlockIds.addLong(blocks2.get(0).getBlockId());
-    processBlockIds.addLong(blocks2.get(1).getBlockId());
+    processBlockIds.add(blocks2.get(0).getBlockId());
+    processBlockIds.add(blocks2.get(1).getBlockId());
 
     // read the 3-th segment from localFile
     sdr = composedClientReadHandler.readShuffleData();
     expectedData.clear();
     expectedData.put(blocks2.get(2).getBlockId(), ByteBufUtils.readBytes(blocks2.get(2).getData()));
     validateResult(expectedData, sdr);
-    processBlockIds.addLong(blocks2.get(2).getBlockId());
+    processBlockIds.add(blocks2.get(2).getBlockId());
 
     // all segments are processed
     sdr = composedClientReadHandler.readShuffleData();
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index de96781..895660a 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -46,6 +46,7 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.RetryUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -137,7 +138,7 @@
         ShuffleDataDistributionType.NORMAL,
         -1);
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
 
     // simulator a failed server
     ShuffleServerInfo fakeShuffleServerInfo =
@@ -154,13 +155,13 @@
             Lists.newArrayList(shuffleServerInfo1, fakeShuffleServerInfo));
     SendShuffleDataResult result =
         shuffleWriteClientImpl.sendShuffleData(testAppId, blocks, () -> false);
-    Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet failedBlockIdBitmap = BlockIdSet.empty();
+    BlockIdSet succBlockIdBitmap = BlockIdSet.empty();
     for (Long blockId : result.getFailedBlockIds()) {
-      failedBlockIdBitmap.addLong(blockId);
+      failedBlockIdBitmap.add(blockId);
     }
     for (Long blockId : result.getSuccessBlockIds()) {
-      succBlockIdBitmap.addLong(blockId);
+      succBlockIdBitmap.add(blockId);
     }
     // There will no failed blocks when replica=2
     assertEquals(failedBlockIdBitmap.getLongCardinality(), 0);
@@ -178,7 +179,7 @@
     serverToPartitionToBlockIds.put(shuffleServerInfo1, ptb);
     serverToPartitionToBlockIds.put(fakeShuffleServerInfo, ptb);
     shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 0, 0, 2);
-    Roaring64NavigableMap report =
+    BlockIdSet report =
         shuffleWriteClientImpl.getShuffleResult(
             "GRPC", Sets.newHashSet(shuffleServerInfo1, fakeShuffleServerInfo), testAppId, 0, 0);
     assertEquals(blockIdBitmap, report);
@@ -218,7 +219,7 @@
     serverToPartitionToBlockIds.put(shuffleServerInfo1, partitionToBlocks);
     // case1
     shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 1, 0, 1);
-    Roaring64NavigableMap bitmap =
+    BlockIdSet bitmap =
         shuffleWriteClientImpl.getShuffleResult(
             "GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 0);
     assertTrue(bitmap.isEmpty());
@@ -277,7 +278,7 @@
 
     shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 1, 0, 1);
 
-    Roaring64NavigableMap bitmap =
+    BlockIdSet bitmap =
         shuffleWriteClientImpl.getShuffleResult(
             "GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 0);
     assertTrue(bitmap.isEmpty());
@@ -334,7 +335,7 @@
         ShuffleDataDistributionType.NORMAL,
         -1);
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
 
     List<ShuffleBlockInfo> blocks =
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
index 0401b2c..bfa5215 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
@@ -43,7 +43,6 @@
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
@@ -55,6 +54,7 @@
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase;
 import org.apache.uniffle.storage.util.StorageType;
@@ -225,7 +225,7 @@
               .collect(Collectors.toSet());
 
       for (int partitionId : new int[] {0, 1}) {
-        Roaring64NavigableMap blockIdLongs =
+        BlockIdSet blockIdLongs =
             shuffleWriteClient.getShuffleResult(
                 ClientType.GRPC.name(), servers, shuffleManager.getAppId(), 0, partitionId);
         List<BlockId> blockIds =
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index 11e6054..ae9128d 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -18,6 +18,7 @@
 package org.apache.uniffle.test;
 
 import java.io.File;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -32,7 +33,6 @@
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
-import org.roaringbitmap.longlong.LongIterator;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
@@ -50,7 +50,7 @@
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.util.BlockId;
 import org.apache.uniffle.common.util.BlockIdLayout;
-import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.storage.util.StorageType;
@@ -157,10 +157,10 @@
     BlockIdLayout layout = BlockIdLayout.DEFAULT;
     registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     createTestData(testAppId, expectedData, blockIdBitmap, taskIdBitmap, isNettyMode);
-    blockIdBitmap.addLong(layout.getBlockId(0, 1, 0));
+    blockIdBitmap.add(layout.getBlockId(0, 1, 0));
     ShuffleReadClientImpl readClient;
     readClient =
         baseReadBuilder(isNettyMode)
@@ -187,7 +187,7 @@
     registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     List<ShuffleBlockInfo> blocks =
         createShuffleBlockList(0, 0, 0, 2, 30, blockIdBitmap, expectedData, mockSSI);
@@ -214,7 +214,7 @@
     registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     List<ShuffleBlockInfo> blocks =
         createShuffleBlockList(0, 0, 0, 2, 30, blockIdBitmap, expectedData, mockSSI);
     sendTestData(testAppId, blocks, isNettyMode);
@@ -252,13 +252,13 @@
 
     Map<Long, byte[]> expectedData1 = Maps.newHashMap();
     Map<Long, byte[]> expectedData2 = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap1 = BlockIdSet.empty();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     List<ShuffleBlockInfo> blocks =
         createShuffleBlockList(0, 0, 0, 10, 30, blockIdBitmap1, expectedData1, mockSSI);
     sendTestData(testAppId, blocks, isNettyMode);
 
-    Roaring64NavigableMap blockIdBitmap2 = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap2 = BlockIdSet.empty();
     blocks = createShuffleBlockList(0, 1, 0, 10, 30, blockIdBitmap2, expectedData2, mockSSI);
     sendTestData(testAppId, blocks, isNettyMode);
 
@@ -298,7 +298,7 @@
             .appId(testAppId)
             .partitionId(1)
             .partitionNumPerRange(2)
-            .blockIdBitmap(Roaring64NavigableMap.bitmapOf())
+            .blockIdBitmap(BlockIdSet.empty())
             .taskIdBitmap(Roaring64NavigableMap.bitmapOf())
             .build();
     assertNull(readClient.readShuffleBlockData());
@@ -313,17 +313,17 @@
     registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     List<ShuffleBlockInfo> blocks =
         createShuffleBlockList(0, 0, 0, 5, 30, blockIdBitmap, expectedData, mockSSI);
     sendTestData(testAppId, blocks, isNettyMode);
 
-    Roaring64NavigableMap wrongBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
-    LongIterator iter = blockIdBitmap.getLongIterator();
+    BlockIdSet wrongBlockIdBitmap = BlockIdSet.empty();
+    Iterator<Long> iter = blockIdBitmap.stream().iterator();
     while (iter.hasNext()) {
       BlockId blockId = layout.asBlockId(iter.next());
-      wrongBlockIdBitmap.addLong(
+      wrongBlockIdBitmap.add(
           layout.getBlockId(blockId.sequenceNo, blockId.partitionId + 1, blockId.taskAttemptId));
     }
 
@@ -349,7 +349,7 @@
     registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 1);
 
     List<ShuffleBlockInfo> blocks =
@@ -382,24 +382,20 @@
     registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 3);
     List<ShuffleBlockInfo> blocks =
         createShuffleBlockList(0, 0, 0, 5, 30, blockIdBitmap, expectedData, mockSSI);
     sendTestData(testAppId, blocks, isNettyMode);
 
     // test case: data generated by speculation task without report result
-    blocks =
-        createShuffleBlockList(
-            0, 0, 1, 5, 30, Roaring64NavigableMap.bitmapOf(), Maps.newHashMap(), mockSSI);
+    blocks = createShuffleBlockList(0, 0, 1, 5, 30, BlockIdSet.empty(), Maps.newHashMap(), mockSSI);
     sendTestData(testAppId, blocks, isNettyMode);
     // test case: data generated by speculation task with report result
     blocks = createShuffleBlockList(0, 0, 2, 5, 30, blockIdBitmap, Maps.newHashMap(), mockSSI);
     sendTestData(testAppId, blocks, isNettyMode);
 
-    blocks =
-        createShuffleBlockList(
-            0, 0, 3, 5, 30, Roaring64NavigableMap.bitmapOf(), Maps.newHashMap(), mockSSI);
+    blocks = createShuffleBlockList(0, 0, 3, 5, 30, BlockIdSet.empty(), Maps.newHashMap(), mockSSI);
     sendTestData(testAppId, blocks, isNettyMode);
 
     // unexpected taskAttemptId should be filtered
@@ -421,13 +417,13 @@
     String testAppId = "localReadTest9";
     registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet blockIdBitmap = BlockIdSet.empty();
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
 
     List<ShuffleBlockInfo> blocks;
 
     createTestData(testAppId, expectedData, blockIdBitmap, taskIdBitmap, isNettyMode);
-    Roaring64NavigableMap beforeAdded = RssUtils.cloneBitMap(blockIdBitmap);
+    BlockIdSet beforeAdded = blockIdBitmap.copy();
     // write data by another task, read data again, the cache for index file should be updated
     blocks = createShuffleBlockList(0, 0, 1, 3, 25, blockIdBitmap, Maps.newHashMap(), mockSSI);
     sendTestData(testAppId, blocks, isNettyMode);
@@ -467,8 +463,8 @@
     registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap expectedBlockIds = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap unexpectedBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectedBlockIds = BlockIdSet.empty();
+    BlockIdSet unexpectedBlockIds = BlockIdSet.empty();
     final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 1);
     // send some expected data
     List<ShuffleBlockInfo> blocks =
@@ -528,7 +524,7 @@
   private void createTestData(
       String testAppId,
       Map<Long, byte[]> expectedData,
-      Roaring64NavigableMap blockIdBitmap,
+      BlockIdSet blockIdBitmap,
       Roaring64NavigableMap taskIdBitmap,
       boolean isNettyMode) {
     List<ShuffleBlockInfo> blocks =
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleResultResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleResultResponse.java
index aca33aa..5003071 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleResultResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleResultResponse.java
@@ -19,24 +19,24 @@
 
 import java.io.IOException;
 
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
-
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
+import org.apache.uniffle.common.util.RoaringBitmapBlockIdSet;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.proto.RssProtos;
 
 public class RssGetShuffleResultResponse extends ClientResponse {
 
-  private Roaring64NavigableMap blockIdBitmap;
+  private BlockIdSet blockIdBitmap;
 
   public RssGetShuffleResultResponse(StatusCode statusCode, byte[] serializedBitmap)
       throws IOException {
     super(statusCode);
-    blockIdBitmap = RssUtils.deserializeBitMap(serializedBitmap);
+    blockIdBitmap = new RoaringBitmapBlockIdSet(RssUtils.deserializeBitMap(serializedBitmap));
   }
 
-  public Roaring64NavigableMap getBlockIdBitmap() {
+  public BlockIdSet getBlockIdBitmap() {
     return blockIdBitmap;
   }
 
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 9237884..4513ea8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -27,13 +27,13 @@
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.server.flush.EventDiscardException;
@@ -58,8 +58,7 @@
   private final ShuffleServerConf shuffleServerConf;
   private Configuration hadoopConf;
   // appId -> shuffleId -> committed shuffle blockIds
-  private Map<String, Map<Integer, Roaring64NavigableMap>> committedBlockIds =
-      JavaUtils.newConcurrentMap();
+  private Map<String, Map<Integer, BlockIdSet>> committedBlockIds = JavaUtils.newConcurrentMap();
   private final int retryMax;
 
   private final StorageManager storageManager;
@@ -209,23 +208,19 @@
       return;
     }
     committedBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
-    Map<Integer, Roaring64NavigableMap> shuffleToBlockIds = committedBlockIds.get(appId);
-    shuffleToBlockIds.computeIfAbsent(shuffleId, key -> Roaring64NavigableMap.bitmapOf());
-    Roaring64NavigableMap bitmap = shuffleToBlockIds.get(shuffleId);
-    synchronized (bitmap) {
-      for (ShufflePartitionedBlock spb : blocks) {
-        bitmap.addLong(spb.getBlockId());
-      }
-    }
+    Map<Integer, BlockIdSet> shuffleToBlockIds = committedBlockIds.get(appId);
+    shuffleToBlockIds.computeIfAbsent(shuffleId, key -> BlockIdSet.empty());
+    BlockIdSet blockIds = shuffleToBlockIds.get(shuffleId);
+    blockIds.addAll(blocks.stream().mapToLong(ShufflePartitionedBlock::getBlockId));
   }
 
-  public Roaring64NavigableMap getCommittedBlockIds(String appId, Integer shuffleId) {
-    Map<Integer, Roaring64NavigableMap> shuffleIdToBlockIds = committedBlockIds.get(appId);
+  public BlockIdSet getCommittedBlockIds(String appId, Integer shuffleId) {
+    Map<Integer, BlockIdSet> shuffleIdToBlockIds = committedBlockIds.get(appId);
     if (shuffleIdToBlockIds == null) {
       LOG.warn("Unexpected value when getCommittedBlockIds for appId[" + appId + "]");
-      return Roaring64NavigableMap.bitmapOf();
+      return BlockIdSet.empty();
     }
-    Roaring64NavigableMap blockIds = shuffleIdToBlockIds.get(shuffleId);
+    BlockIdSet blockIds = shuffleIdToBlockIds.get(shuffleId);
     if (blockIds == null) {
       LOG.warn(
           "Unexpected value when getCommittedBlockIds for appId["
@@ -233,7 +228,7 @@
               + "], shuffleId["
               + shuffleId
               + "]");
-      return Roaring64NavigableMap.bitmapOf();
+      return BlockIdSet.empty();
     }
     return blockIds;
   }
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index f45b1be..42efa35 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -25,11 +25,11 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.Sets;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.JavaUtils;
 
 /**
@@ -46,7 +46,7 @@
 
   private Map<Integer, Object> commitLocks;
   /** shuffleId -> blockIds */
-  private Map<Integer, Roaring64NavigableMap> cachedBlockIds;
+  private Map<Integer, BlockIdSet> cachedBlockIds;
 
   private AtomicReference<String> user;
 
@@ -93,7 +93,7 @@
     return commitLocks;
   }
 
-  public Map<Integer, Roaring64NavigableMap> getCachedBlockIds() {
+  public Map<Integer, BlockIdSet> getCachedBlockIds() {
     return cachedBlockIds;
   }
 
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 87e5a5e..c7f1c4a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -63,9 +63,9 @@
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.JavaUtils;
-import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
@@ -106,7 +106,7 @@
   // merge different blockId of partition to one bitmap can reduce memory cost,
   // but when get blockId, performance will degrade a little which can be optimized by client
   // configuration
-  private Map<String, Map<Integer, Roaring64NavigableMap[]>> partitionsToBlockIds;
+  private Map<String, Map<Integer, BlockIdSet[]>> partitionsToBlockIds;
   private final ShuffleBufferManager shuffleBufferManager;
   private Map<String, ShuffleTaskInfo> shuffleTaskInfos = JavaUtils.newConcurrentMap();
   private Map<Long, PreAllocatedBufferInfo> requireBufferIds = JavaUtils.newConcurrentMap();
@@ -330,8 +330,8 @@
   public StatusCode commitShuffle(String appId, int shuffleId) throws Exception {
     long start = System.currentTimeMillis();
     refreshAppId(appId);
-    Roaring64NavigableMap cachedBlockIds = getCachedBlockIds(appId, shuffleId);
-    Roaring64NavigableMap cloneBlockIds;
+    BlockIdSet cachedBlockIds = getCachedBlockIds(appId, shuffleId);
+    BlockIdSet cloneBlockIds;
     ShuffleTaskInfo shuffleTaskInfo =
         shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo(appId));
     Object lock = shuffleTaskInfo.getCommitLocks().computeIfAbsent(shuffleId, x -> new Object());
@@ -340,20 +340,17 @@
       if (System.currentTimeMillis() - start > commitTimeout) {
         throw new RssException("Shuffle data commit timeout for " + commitTimeout + " ms");
       }
-      synchronized (cachedBlockIds) {
-        cloneBlockIds = RssUtils.cloneBitMap(cachedBlockIds);
-      }
+      cloneBlockIds = cachedBlockIds.copy();
       long expectedCommitted = cloneBlockIds.getLongCardinality();
       shuffleBufferManager.commitShuffleTask(appId, shuffleId);
-      Roaring64NavigableMap committedBlockIds;
-      Roaring64NavigableMap cloneCommittedBlockIds;
+      BlockIdSet committedBlockIds;
+      BlockIdSet cloneCommittedBlockIds;
       long checkInterval = 1000L;
       while (true) {
         committedBlockIds = shuffleFlushManager.getCommittedBlockIds(appId, shuffleId);
-        synchronized (committedBlockIds) {
-          cloneCommittedBlockIds = RssUtils.cloneBitMap(committedBlockIds);
-        }
-        cloneBlockIds.andNot(cloneCommittedBlockIds);
+        // create thread-safe copy, then remove those block ids
+        cloneCommittedBlockIds = committedBlockIds.copy();
+        cloneBlockIds.removeAll(cloneCommittedBlockIds);
         if (cloneBlockIds.isEmpty()) {
           break;
         }
@@ -390,20 +387,20 @@
   public void addFinishedBlockIds(
       String appId, Integer shuffleId, Map<Integer, long[]> partitionToBlockIds, int bitmapNum) {
     refreshAppId(appId);
-    Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
+    Map<Integer, BlockIdSet[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
     if (shuffleIdToPartitions == null) {
       throw new RssException("appId[" + appId + "] is expired!");
     }
     shuffleIdToPartitions.computeIfAbsent(
         shuffleId,
         key -> {
-          Roaring64NavigableMap[] blockIds = new Roaring64NavigableMap[bitmapNum];
+          BlockIdSet[] blockIds = new BlockIdSet[bitmapNum];
           for (int i = 0; i < bitmapNum; i++) {
-            blockIds[i] = Roaring64NavigableMap.bitmapOf();
+            blockIds[i] = BlockIdSet.empty();
           }
           return blockIds;
         });
-    Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
+    BlockIdSet[] blockIds = shuffleIdToPartitions.get(shuffleId);
     if (blockIds.length != bitmapNum) {
       throw new InvalidRequestException(
           "Request expects "
@@ -415,12 +412,8 @@
 
     for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
       Integer partitionId = entry.getKey();
-      Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum];
-      synchronized (bitmap) {
-        for (long blockId : entry.getValue()) {
-          bitmap.addLong(blockId);
-        }
-      }
+      BlockIdSet bitmap = blockIds[partitionId % bitmapNum];
+      bitmap.addAll(Arrays.stream(entry.getValue()));
     }
   }
 
@@ -444,28 +437,22 @@
     }
     ShuffleTaskInfo shuffleTaskInfo =
         shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo(appId));
-    Roaring64NavigableMap bitmap =
-        shuffleTaskInfo
-            .getCachedBlockIds()
-            .computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf());
+    BlockIdSet blockIds =
+        shuffleTaskInfo.getCachedBlockIds().computeIfAbsent(shuffleId, x -> BlockIdSet.empty());
 
     long size = 0L;
-    synchronized (bitmap) {
-      for (ShufflePartitionedBlock spb : spbs) {
-        bitmap.addLong(spb.getBlockId());
-        size += spb.getSize();
-      }
-    }
+    blockIds.addAll(Arrays.stream(spbs).mapToLong(ShufflePartitionedBlock::getBlockId));
+    size += Arrays.stream(spbs).mapToLong(ShufflePartitionedBlock::getSize).sum();
     long partitionSize = shuffleTaskInfo.addPartitionDataSize(shuffleId, partitionId, size);
     if (shuffleBufferManager.isHugePartition(partitionSize)) {
       shuffleTaskInfo.markHugePartition(shuffleId, partitionId);
     }
   }
 
-  public Roaring64NavigableMap getCachedBlockIds(String appId, int shuffleId) {
-    Map<Integer, Roaring64NavigableMap> shuffleIdToBlockIds =
+  public BlockIdSet getCachedBlockIds(String appId, int shuffleId) {
+    Map<Integer, BlockIdSet> shuffleIdToBlockIds =
         shuffleTaskInfos.getOrDefault(appId, new ShuffleTaskInfo(appId)).getCachedBlockIds();
-    Roaring64NavigableMap blockIds = shuffleIdToBlockIds.get(shuffleId);
+    BlockIdSet blockIds = shuffleIdToBlockIds.get(shuffleId);
     if (blockIds == null) {
       LOG.warn(
           "Unexpected value when getCachedBlockIds for appId["
@@ -473,7 +460,7 @@
               + "], shuffleId["
               + shuffleId
               + "]");
-      return Roaring64NavigableMap.bitmapOf();
+      return BlockIdSet.empty();
     }
     return blockIds;
   }
@@ -551,12 +538,12 @@
         storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
       }
     }
-    Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
+    Map<Integer, BlockIdSet[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
     if (shuffleIdToPartitions == null) {
       return null;
     }
 
-    Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
+    BlockIdSet[] blockIds = shuffleIdToPartitions.get(shuffleId);
     if (blockIds == null) {
       return new byte[] {};
     }
@@ -571,29 +558,29 @@
       }
     }
 
-    Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet res = BlockIdSet.empty();
     for (Map.Entry<Integer, Set<Integer>> entry : bitmapIndexToPartitions.entrySet()) {
       Set<Integer> requestPartitions = entry.getValue();
-      Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
-      getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
+      BlockIdSet set = blockIds[entry.getKey()];
+      getBlockIdsByPartitionId(requestPartitions, set, res, blockIdLayout);
     }
-    return RssUtils.serializeBitMap(res);
+    return res.serialize();
   }
 
   // filter the specific partition blockId in the bitmap to the resultBitmap
-  protected Roaring64NavigableMap getBlockIdsByPartitionId(
+  protected BlockIdSet getBlockIdsByPartitionId(
       Set<Integer> requestPartitions,
-      Roaring64NavigableMap bitmap,
-      Roaring64NavigableMap resultBitmap,
+      BlockIdSet blockIds,
+      BlockIdSet result,
       BlockIdLayout blockIdLayout) {
-    bitmap.forEach(
+    blockIds.forEach(
         blockId -> {
           int partitionId = blockIdLayout.getPartitionId(blockId);
           if (requestPartitions.contains(partitionId)) {
-            resultBitmap.addLong(blockId);
+            result.add(blockId);
           }
         });
-    return resultBitmap;
+    return result;
   }
 
   public ShuffleDataResult getInMemoryShuffleData(
@@ -857,7 +844,7 @@
   }
 
   @VisibleForTesting
-  public Map<String, Map<Integer, Roaring64NavigableMap[]>> getPartitionsToBlockIds() {
+  public Map<String, Map<Integer, BlockIdSet[]>> getPartitionsToBlockIds() {
     return partitionsToBlockIds;
   }
 
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 2b21445..51c5ddf 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -45,13 +45,13 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
@@ -670,11 +670,11 @@
       List<ShufflePartitionedBlock> blocks,
       int partitionNumPerRange,
       String basePath) {
-    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectBlockIds = BlockIdSet.empty();
+    BlockIdSet processBlockIds = BlockIdSet.empty();
     Set<Long> remainIds = Sets.newHashSet();
     for (ShufflePartitionedBlock spb : blocks) {
-      expectBlockIds.addLong(spb.getBlockId());
+      expectBlockIds.add(spb.getBlockId());
       remainIds.add(spb.getBlockId());
     }
     HadoopClientReadHandler handler =
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 75c49cd..008192f 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -55,6 +55,7 @@
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
@@ -769,44 +770,38 @@
     ShuffleServerConf conf = new ShuffleServerConf();
     ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, null, null, null);
 
-    Roaring64NavigableMap expectedBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectedBlockIds = BlockIdSet.empty();
     int expectedPartitionId = 5;
-    Roaring64NavigableMap bitmapBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet bitmapBlockIds = BlockIdSet.empty();
     for (int taskId = 1; taskId < 10; taskId++) {
       for (int partitionId = 1; partitionId < 10; partitionId++) {
         for (int i = 0; i < 2; i++) {
           long blockId = layout.getBlockId(i, partitionId, taskId);
-          bitmapBlockIds.addLong(blockId);
+          bitmapBlockIds.add(blockId);
           if (partitionId == expectedPartitionId) {
-            expectedBlockIds.addLong(blockId);
+            expectedBlockIds.add(blockId);
           }
         }
       }
     }
-    Roaring64NavigableMap resultBlockIds =
+    BlockIdSet resultBlockIds =
         shuffleTaskManager.getBlockIdsByPartitionId(
-            Sets.newHashSet(expectedPartitionId),
-            bitmapBlockIds,
-            Roaring64NavigableMap.bitmapOf(),
-            layout);
+            Sets.newHashSet(expectedPartitionId), bitmapBlockIds, BlockIdSet.empty(), layout);
     assertEquals(expectedBlockIds, resultBlockIds);
 
-    bitmapBlockIds.addLong(layout.getBlockId(0, 0, 0));
+    bitmapBlockIds.add(layout.getBlockId(0, 0, 0));
     resultBlockIds =
         shuffleTaskManager.getBlockIdsByPartitionId(
-            Sets.newHashSet(0), bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout);
-    assertEquals(Roaring64NavigableMap.bitmapOf(0L), resultBlockIds);
+            Sets.newHashSet(0), bitmapBlockIds, BlockIdSet.empty(), layout);
+    assertEquals(BlockIdSet.of(0L), resultBlockIds);
 
     long expectedBlockId =
         layout.getBlockId(layout.maxSequenceNo, layout.maxPartitionId, layout.maxTaskAttemptId);
-    bitmapBlockIds.addLong(expectedBlockId);
+    bitmapBlockIds.add(expectedBlockId);
     resultBlockIds =
         shuffleTaskManager.getBlockIdsByPartitionId(
-            Sets.newHashSet(layout.maxPartitionId),
-            bitmapBlockIds,
-            Roaring64NavigableMap.bitmapOf(),
-            layout);
-    assertEquals(Roaring64NavigableMap.bitmapOf(expectedBlockId), resultBlockIds);
+            Sets.newHashSet(layout.maxPartitionId), bitmapBlockIds, BlockIdSet.empty(), layout);
+    assertEquals(BlockIdSet.of(expectedBlockId), resultBlockIds);
   }
 
   @Test
@@ -815,17 +810,17 @@
     ShuffleServerConf conf = new ShuffleServerConf();
     ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, null, null, null);
 
-    Roaring64NavigableMap expectedBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectedBlockIds = BlockIdSet.empty();
     int startPartition = 3;
     int endPartition = 5;
-    Roaring64NavigableMap bitmapBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet bitmapBlockIds = BlockIdSet.empty();
     for (int taskId = 1; taskId < 10; taskId++) {
       for (int partitionId = 1; partitionId < 10; partitionId++) {
         for (int i = 0; i < 2; i++) {
           long blockId = layout.getBlockId(i, partitionId, taskId);
-          bitmapBlockIds.addLong(blockId);
+          bitmapBlockIds.add(blockId);
           if (partitionId >= startPartition && partitionId <= endPartition) {
-            expectedBlockIds.addLong(blockId);
+            expectedBlockIds.add(blockId);
           }
         }
       }
@@ -839,14 +834,14 @@
       }
     }
 
-    Roaring64NavigableMap resultBlockIds =
+    BlockIdSet resultBlockIds =
         shuffleTaskManager.getBlockIdsByPartitionId(
-            requestPartitions, bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout);
+            requestPartitions, bitmapBlockIds, BlockIdSet.empty(), layout);
     assertEquals(expectedBlockIds, resultBlockIds);
     assertEquals(
         bitmapBlockIds,
         shuffleTaskManager.getBlockIdsByPartitionId(
-            allPartitions, bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout));
+            allPartitions, bitmapBlockIds, BlockIdSet.empty(), layout));
   }
 
   @Test
@@ -1101,11 +1096,11 @@
       int partitionId,
       List<ShufflePartitionedBlock> blocks,
       String basePath) {
-    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectBlockIds = BlockIdSet.empty();
+    BlockIdSet processBlockIds = BlockIdSet.empty();
     Set<Long> remainIds = Sets.newHashSet();
     for (ShufflePartitionedBlock spb : blocks) {
-      expectBlockIds.addLong(spb.getBlockId());
+      expectBlockIds.add(spb.getBlockId());
       remainIds.add(spb.getBlockId());
     }
     HadoopClientReadHandler handler =
diff --git a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index 4edca3b..4fe7dbd 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -29,6 +29,7 @@
 import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.storage.handler.api.ClientReadHandler;
 import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
@@ -123,8 +124,8 @@
             .getShuffleServerClient(request.getClientType().name(), ssi, request.getClientConf());
     Roaring64NavigableMap expectTaskIds = null;
     if (request.isExpectedTaskIdsBitmapFilterEnable()) {
-      Roaring64NavigableMap realExceptBlockIds = RssUtils.cloneBitMap(request.getExpectBlockIds());
-      realExceptBlockIds.xor(request.getProcessBlockIds());
+      BlockIdSet realExceptBlockIds = request.getExpectBlockIds().copy();
+      realExceptBlockIds.removeAll(request.getProcessBlockIds());
       expectTaskIds = RssUtils.generateTaskIdBitMap(realExceptBlockIds, request.getIdHelper());
     }
     ClientReadHandler memoryClientReadHandler =
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
index 220e029..1ac3590 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
@@ -24,11 +24,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.segment.SegmentSplitterFactory;
+import org.apache.uniffle.common.util.BlockIdSet;
 
 public abstract class DataSkippableReadHandler extends AbstractClientReadHandler {
   private static final Logger LOG = LoggerFactory.getLogger(DataSkippableReadHandler.class);
@@ -36,8 +38,8 @@
   protected List<ShuffleDataSegment> shuffleDataSegments = Lists.newArrayList();
   protected int segmentIndex = 0;
 
-  protected Roaring64NavigableMap expectBlockIds;
-  protected Roaring64NavigableMap processBlockIds;
+  protected BlockIdSet expectBlockIds;
+  protected BlockIdSet processBlockIds;
 
   protected ShuffleDataDistributionType distributionType;
   protected Roaring64NavigableMap expectTaskIds;
@@ -47,8 +49,8 @@
       int shuffleId,
       int partitionId,
       int readBufferSize,
-      Roaring64NavigableMap expectBlockIds,
-      Roaring64NavigableMap processBlockIds,
+      BlockIdSet expectBlockIds,
+      BlockIdSet processBlockIds,
       ShuffleDataDistributionType distributionType,
       Roaring64NavigableMap expectTaskIds) {
     this.appId = appId;
@@ -87,14 +89,14 @@
     ShuffleDataResult result = null;
     while (segmentIndex < shuffleDataSegments.size()) {
       ShuffleDataSegment segment = shuffleDataSegments.get(segmentIndex);
-      Roaring64NavigableMap blocksOfSegment = Roaring64NavigableMap.bitmapOf();
-      segment.getBufferSegments().forEach(block -> blocksOfSegment.addLong(block.getBlockId()));
+      BlockIdSet blocksOfSegment = BlockIdSet.empty();
+      blocksOfSegment.addAll(
+          segment.getBufferSegments().stream().mapToLong(BufferSegment::getBlockId));
       // skip unexpected blockIds
-      blocksOfSegment.and(expectBlockIds);
+      blocksOfSegment.retainAll(expectBlockIds);
       if (!blocksOfSegment.isEmpty()) {
         // skip processed blockIds
-        blocksOfSegment.or(processBlockIds);
-        blocksOfSegment.xor(processBlockIds);
+        blocksOfSegment.removeAll(processBlockIds);
         if (!blocksOfSegment.isEmpty()) {
           result = readShuffleData(segment);
           segmentIndex++;
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java
index 1cf636e..d745bb7 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java
@@ -35,6 +35,7 @@
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
@@ -46,8 +47,8 @@
   protected final int partitionNum;
   protected final int readBufferSize;
   private final String shuffleServerId;
-  protected Roaring64NavigableMap expectBlockIds;
-  protected Roaring64NavigableMap processBlockIds;
+  protected BlockIdSet expectBlockIds;
+  protected BlockIdSet processBlockIds;
   protected final String storageBasePath;
   protected final Configuration hadoopConf;
   protected final List<HadoopShuffleReadHandler> readHandlers = Lists.newArrayList();
@@ -64,8 +65,8 @@
       int partitionNumPerRange,
       int partitionNum,
       int readBufferSize,
-      Roaring64NavigableMap expectBlockIds,
-      Roaring64NavigableMap processBlockIds,
+      BlockIdSet expectBlockIds,
+      BlockIdSet processBlockIds,
       String storageBasePath,
       Configuration hadoopConf,
       ShuffleDataDistributionType distributionType,
@@ -98,8 +99,8 @@
       int partitionNumPerRange,
       int partitionNum,
       int readBufferSize,
-      Roaring64NavigableMap expectBlockIds,
-      Roaring64NavigableMap processBlockIds,
+      BlockIdSet expectBlockIds,
+      BlockIdSet processBlockIds,
       String storageBasePath,
       Configuration hadoopConf) {
     this(
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandler.java
index c7af921..37ca5ba 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandler.java
@@ -31,6 +31,7 @@
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
@@ -52,8 +53,8 @@
       int partitionId,
       String filePrefix,
       int readBufferSize,
-      Roaring64NavigableMap expectBlockIds,
-      Roaring64NavigableMap processBlockIds,
+      BlockIdSet expectBlockIds,
+      BlockIdSet processBlockIds,
       Configuration conf,
       ShuffleDataDistributionType distributionType,
       Roaring64NavigableMap expectTaskIds,
@@ -83,8 +84,8 @@
       int partitionId,
       String filePrefix,
       int readBufferSize,
-      Roaring64NavigableMap expectBlockIds,
-      Roaring64NavigableMap processBlockIds,
+      BlockIdSet expectBlockIds,
+      BlockIdSet processBlockIds,
       Configuration conf)
       throws Exception {
     this(
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
index 2b5ea8f..881f312 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
@@ -32,6 +32,7 @@
 import org.apache.uniffle.common.ShuffleIndexResult;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.exception.RssFetchFailedException;
+import org.apache.uniffle.common.util.BlockIdSet;
 
 public class LocalFileClientReadHandler extends DataSkippableReadHandler {
   private static final Logger LOG = LoggerFactory.getLogger(LocalFileClientReadHandler.class);
@@ -49,8 +50,8 @@
       int partitionNumPerRange,
       int partitionNum,
       int readBufferSize,
-      Roaring64NavigableMap expectBlockIds,
-      Roaring64NavigableMap processBlockIds,
+      BlockIdSet expectBlockIds,
+      BlockIdSet processBlockIds,
       ShuffleServerClient shuffleServerClient,
       ShuffleDataDistributionType distributionType,
       Roaring64NavigableMap expectTaskIds,
@@ -81,8 +82,8 @@
       int partitionNumPerRange,
       int partitionNum,
       int readBufferSize,
-      Roaring64NavigableMap expectBlockIds,
-      Roaring64NavigableMap processBlockIds,
+      BlockIdSet expectBlockIds,
+      BlockIdSet processBlockIds,
       ShuffleServerClient shuffleServerClient) {
     this(
         appId,
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
index c3e2200..750c5e3 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
@@ -19,7 +19,6 @@
 
 import java.util.List;
 
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,6 +26,7 @@
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.storage.handler.api.ClientReadHandler;
 
@@ -36,16 +36,16 @@
 
   private final List<ClientReadHandler> handlers;
   private final List<ShuffleServerInfo> shuffleServerInfos;
-  private final Roaring64NavigableMap blockIdBitmap;
-  private final Roaring64NavigableMap processedBlockIds;
+  private final BlockIdSet blockIdBitmap;
+  private final BlockIdSet processedBlockIds;
 
   private int readHandlerIndex;
 
   public MultiReplicaClientReadHandler(
       List<ClientReadHandler> handlers,
       List<ShuffleServerInfo> shuffleServerInfos,
-      Roaring64NavigableMap blockIdBitmap,
-      Roaring64NavigableMap processedBlockIds) {
+      BlockIdSet blockIdBitmap,
+      BlockIdSet processedBlockIds) {
     this.handlers = handlers;
     this.blockIdBitmap = blockIdBitmap;
     this.processedBlockIds = processedBlockIds;
diff --git a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
index 9b73dc8..501fa4f 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
@@ -27,6 +27,7 @@
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.IdHelper;
 
 public class CreateShuffleReadHandlerRequest {
@@ -45,8 +46,8 @@
   private RssBaseConf rssBaseConf;
   private Configuration hadoopConf;
   private List<ShuffleServerInfo> shuffleServerInfoList;
-  private Roaring64NavigableMap expectBlockIds;
-  private Roaring64NavigableMap processBlockIds;
+  private BlockIdSet expectBlockIds;
+  private BlockIdSet processBlockIds;
   private ShuffleDataDistributionType distributionType;
   private Roaring64NavigableMap expectTaskIds;
   private boolean expectedTaskIdsBitmapFilterEnable;
@@ -171,19 +172,19 @@
     this.hadoopConf = hadoopConf;
   }
 
-  public void setExpectBlockIds(Roaring64NavigableMap expectBlockIds) {
+  public void setExpectBlockIds(BlockIdSet expectBlockIds) {
     this.expectBlockIds = expectBlockIds;
   }
 
-  public Roaring64NavigableMap getExpectBlockIds() {
+  public BlockIdSet getExpectBlockIds() {
     return expectBlockIds;
   }
 
-  public void setProcessBlockIds(Roaring64NavigableMap processBlockIds) {
+  public void setProcessBlockIds(BlockIdSet processBlockIds) {
     this.processBlockIds = processBlockIds;
   }
 
-  public Roaring64NavigableMap getProcessBlockIds() {
+  public BlockIdSet getProcessBlockIds() {
     return processBlockIds;
   }
 
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandlerTest.java
index fa684b8..49277a1 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandlerTest.java
@@ -27,11 +27,11 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.Test;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.storage.HadoopTestBase;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -52,7 +52,7 @@
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, "test", hadoopConf, writeUser);
 
     Map<Long, byte[]> expectedData = Maps.newHashMap();
-    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectBlockIds = BlockIdSet.empty();
 
     int readBufferSize = 13;
     int total = 0;
@@ -65,7 +65,7 @@
       writeTestData(writeHandler, num, 3, 0, expectedData);
       total += calcExpectedSegmentNum(num, 3, readBufferSize);
       expectTotalBlockNum += num;
-      expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
+      expectedData.forEach((id, block) -> expectBlockIds.add(id));
     }
 
     /** This part is to check the fault tolerance of reading HDFS incomplete index file */
@@ -75,7 +75,7 @@
     indexWriter.writeData(ByteBuffer.allocate(4).putInt(999).array());
     indexWriter.close();
 
-    Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet processBlockIds = BlockIdSet.empty();
 
     HadoopShuffleReadHandler indexReader =
         new HadoopShuffleReadHandler(
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopHandlerTest.java
index 94c5f3c..241cad3 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopHandlerTest.java
@@ -28,11 +28,11 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.storage.HadoopTestBase;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
 
@@ -97,10 +97,10 @@
       List<byte[]> expectedData,
       List<Long> expectedBlockId)
       throws IllegalStateException {
-    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectBlockIds = BlockIdSet.empty();
+    BlockIdSet processBlockIds = BlockIdSet.empty();
     for (long blockId : expectedBlockId) {
-      expectBlockIds.addLong(blockId);
+      expectBlockIds.add(blockId);
     }
     // read directly and compare
     HadoopClientReadHandler readHandler =
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
index c11fc27..af20021 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
@@ -31,12 +31,12 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.Test;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.storage.HadoopShuffleHandlerTestBase;
 import org.apache.uniffle.storage.HadoopTestBase;
@@ -65,9 +65,9 @@
     int total =
         HadoopShuffleHandlerTestBase.calcExpectedSegmentNum(
             expectTotalBlockNum, blockSize, readBufferSize);
-    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
-    expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
+    BlockIdSet expectBlockIds = BlockIdSet.empty();
+    BlockIdSet processBlockIds = BlockIdSet.empty();
+    expectedData.forEach((id, block) -> expectBlockIds.add(id));
     String fileNamePrefix =
         ShuffleStorageUtils.getFullShuffleDataFolder(
                 basePath, ShuffleStorageUtils.getShuffleDataPathWithRange("appId", 0, 1, 1, 10))
@@ -129,9 +129,9 @@
     int total =
         HadoopShuffleHandlerTestBase.calcExpectedSegmentNum(
             expectTotalBlockNum, blockSize, readBufferSize);
-    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
-    Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
-    expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
+    BlockIdSet expectBlockIds = BlockIdSet.empty();
+    BlockIdSet processBlockIds = BlockIdSet.empty();
+    expectedData.forEach((id, block) -> expectBlockIds.add(id));
     String fileNamePrefix =
         ShuffleStorageUtils.getFullShuffleDataFolder(
                 basePath, ShuffleStorageUtils.getShuffleDataPathWithRange("appId", 0, 1, 1, 10))
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
index 2a55ae4..b9fadc3 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
@@ -28,7 +28,6 @@
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
@@ -38,6 +37,7 @@
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -50,7 +50,7 @@
     int blockSize = 7;
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(expectTotalBlockNum * 40);
-    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet expectBlockIds = BlockIdSet.empty();
 
     // We simulate the generation of 4 block index files and 3 block data files to test
     // LocalFileClientReadHandler
@@ -77,7 +77,7 @@
         new HashSet<>());
     byteBuffer.rewind();
 
-    blocks.forEach(block -> expectBlockIds.addLong(block.getBlockId()));
+    blocks.forEach(block -> expectBlockIds.add(block.getBlockId()));
 
     String appId = "app1";
     int shuffleId = 1;
@@ -122,7 +122,7 @@
         .when(mockShuffleServerClient)
         .getShuffleData(Mockito.argThat(segment2Match));
 
-    Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+    BlockIdSet processBlockIds = BlockIdSet.empty();
     LocalFileClientReadHandler handler =
         new LocalFileClientReadHandler(
             appId,