Hide implementation of block id set behind BlockIdSet interface
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
index f761c6e..0795a55 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
@@ -32,12 +32,12 @@
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.storage.HadoopTestBase;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -67,7 +67,7 @@
int blockNum,
int recordNum,
Map<String, String> expectedData,
- Roaring64NavigableMap blockIdBitmap,
+ BlockIdSet blockIdBitmap,
String keyPrefix,
Serializer serializer,
int partitionID)
@@ -91,7 +91,7 @@
int recordNum,
BlockIdLayout layout,
Map<String, String> expectedData,
- Roaring64NavigableMap blockIdBitmap,
+ BlockIdSet blockIdBitmap,
String keyPrefix,
Serializer serializer,
int partitionID)
@@ -114,7 +114,7 @@
int blockNum,
int recordNum,
Map<String, String> expectedData,
- Roaring64NavigableMap blockIdBitmap,
+ BlockIdSet blockIdBitmap,
String keyPrefix,
Serializer serializer,
int partitionID,
@@ -139,7 +139,7 @@
int recordNum,
BlockIdLayout layout,
Map<String, String> expectedData,
- Roaring64NavigableMap blockIdBitmap,
+ BlockIdSet blockIdBitmap,
String keyPrefix,
Serializer serializer,
int partitionID,
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index 3f6993c..490bf62 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -48,6 +48,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
import org.apache.uniffle.storage.util.StorageType;
@@ -82,7 +83,7 @@
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
Map<String, String> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(
writeHandler, 2, 5, layout, expectedData, blockIdBitmap, "key", KRYO_SERIALIZER, 0);
@@ -111,7 +112,7 @@
private RssShuffleDataIterator getDataIterator(
String basePath,
- Roaring64NavigableMap blockIdBitmap,
+ BlockIdSet blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
List<ShuffleServerInfo> serverInfos) {
return getDataIterator(basePath, blockIdBitmap, taskIdBitmap, serverInfos, true);
@@ -119,7 +120,7 @@
private RssShuffleDataIterator getDataIterator(
String basePath,
- Roaring64NavigableMap blockIdBitmap,
+ BlockIdSet blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
List<ShuffleServerInfo> serverInfos,
boolean compress) {
@@ -164,7 +165,7 @@
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);
Map<String, String> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler1, 2, 5, expectedData, blockIdBitmap, "key1", KRYO_SERIALIZER, 0);
writeTestData(writeHandler2, 2, 5, expectedData, blockIdBitmap, "key2", KRYO_SERIALIZER, 0);
@@ -205,7 +206,7 @@
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
Map<String, String> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 2, 5, expectedData, blockIdBitmap, "key", KRYO_SERIALIZER, 0);
@@ -238,7 +239,7 @@
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
Map<String, String> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 2, 5, expectedData, blockIdBitmap, "key", KRYO_SERIALIZER, 0);
@@ -264,7 +265,7 @@
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
Map<String, String> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 2, 5, expectedData, blockIdBitmap, "key", KRYO_SERIALIZER, 0);
@@ -311,7 +312,7 @@
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);
Map<String, String> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(
writeHandler1, 2, 5, expectedData, blockIdBitmap, "key1", KRYO_SERIALIZER, 0, compress);
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 6f5b255..fadf085 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -75,6 +75,7 @@
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.rpc.GrpcServer;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.common.util.RssUtils;
@@ -573,7 +574,7 @@
}
Map<Integer, List<ShuffleServerInfo>> partitionToServers =
shuffleHandleInfo.getAllPartitionServersForReader();
- Roaring64NavigableMap blockIdBitmap =
+ BlockIdSet blockIdBitmap =
getShuffleResult(
clientType,
Sets.newHashSet(partitionToServers.get(startPartition)),
@@ -790,7 +791,7 @@
return shuffleIdToNumMapTasks.getOrDefault(shuffleId, 0);
}
- private Roaring64NavigableMap getShuffleResult(
+ private BlockIdSet getShuffleResult(
String clientType,
Set<ShuffleServerInfo> shuffleServerInfoSet,
String appId,
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 8f5118e..a40fa4d 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -53,6 +53,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockIdSet;
public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
@@ -70,7 +71,7 @@
private String basePath;
private int partitionNumPerRange;
private int partitionNum;
- private Roaring64NavigableMap blockIdBitmap;
+ private BlockIdSet blockIdBitmap;
private Roaring64NavigableMap taskIdBitmap;
private List<ShuffleServerInfo> shuffleServerInfoList;
private Configuration hadoopConf;
@@ -85,7 +86,7 @@
Configuration hadoopConf,
int partitionNumPerRange,
int partitionNum,
- Roaring64NavigableMap blockIdBitmap,
+ BlockIdSet blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
RssConf rssConf,
Map<Integer, List<ShuffleServerInfo>> partitionToServers) {
diff --git a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index f09223b..54ee1a1 100644
--- a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++ b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -38,6 +38,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
import org.apache.uniffle.storage.util.StorageType;
@@ -58,7 +59,7 @@
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi.getId(), conf);
Map<String, String> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 2, 5, expectedData, blockIdBitmap, "key", KRYO_SERIALIZER, 0);
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index f70e38a..46b5ffb 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -89,6 +89,7 @@
import org.apache.uniffle.common.rpc.GrpcServer;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.common.util.RssUtils;
@@ -700,7 +701,7 @@
Map<ShuffleServerInfo, Set<Integer>> serverToPartitions =
getPartitionDataServers(shuffleHandleInfo, startPartition, endPartition);
long start = System.currentTimeMillis();
- Roaring64NavigableMap blockIdBitmap =
+ BlockIdSet blockIdBitmap =
getShuffleResultForMultiPart(
clientType,
serverToPartitions,
@@ -1108,7 +1109,7 @@
return !failedTaskIds.contains(taskId);
}
- private Roaring64NavigableMap getShuffleResultForMultiPart(
+ private BlockIdSet getShuffleResultForMultiPart(
String clientType,
Map<ShuffleServerInfo, Set<Integer>> serverToPartitions,
String appId,
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 9b17634..446a359 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -55,6 +55,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockIdSet;
import static org.apache.uniffle.common.util.Constants.DRIVER_HOST;
@@ -73,7 +74,7 @@
private String taskId;
private String basePath;
private int partitionNum;
- private Map<Integer, Roaring64NavigableMap> partitionToExpectBlocks;
+ private Map<Integer, BlockIdSet> partitionToExpectBlocks;
private Roaring64NavigableMap taskIdBitmap;
private Configuration hadoopConf;
private int mapStartIndex;
@@ -92,7 +93,7 @@
String basePath,
Configuration hadoopConf,
int partitionNum,
- Map<Integer, Roaring64NavigableMap> partitionToExpectBlocks,
+ Map<Integer, BlockIdSet> partitionToExpectBlocks,
Roaring64NavigableMap taskIdBitmap,
ShuffleReadMetrics readMetrics,
RssConf rssConf,
diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
index aaff4cb..ec4b7c7 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
@@ -40,6 +40,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
import org.apache.uniffle.storage.util.StorageType;
@@ -61,10 +62,10 @@
final HadoopShuffleWriteHandler writeHandler1 =
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi.getId(), conf);
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
Map<String, String> expectedData = Maps.newHashMap();
- final Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
+ final BlockIdSet blockIdBitmap1 = BlockIdSet.empty();
writeTestData(writeHandler, 2, 5, expectedData, blockIdBitmap, "key", KRYO_SERIALIZER, 0);
RssShuffleHandle<String, String, String> handleMock = mock(RssShuffleHandle.class);
@@ -87,7 +88,7 @@
when(dependencyMock.keyOrdering()).thenReturn(Option.empty());
when(dependencyMock.mapSideCombine()).thenReturn(false);
- Map<Integer, Roaring64NavigableMap> partitionToExpectBlocks = Maps.newHashMap();
+ Map<Integer, BlockIdSet> partitionToExpectBlocks = Maps.newHashMap();
partitionToExpectBlocks.put(0, blockIdBitmap);
RssConf rssConf = new RssConf();
rssConf.set(RssClientConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
diff --git a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index 7d8f533..c90950d 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -22,8 +22,6 @@
import java.util.Set;
import java.util.function.Supplier;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
-
import org.apache.uniffle.client.PartitionDataReplicaRequirementTracking;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.PartitionRange;
@@ -32,6 +30,7 @@
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.BlockIdSet;
public interface ShuffleWriteClient {
@@ -92,14 +91,14 @@
int assignmentShuffleServerNumber,
int estimateTaskConcurrency);
- Roaring64NavigableMap getShuffleResult(
+ BlockIdSet getShuffleResult(
String clientType,
Set<ShuffleServerInfo> shuffleServerInfoSet,
String appId,
int shuffleId,
int partitionId);
- Roaring64NavigableMap getShuffleResultForMultiPart(
+ BlockIdSet getShuffleResultForMultiPart(
String clientType,
Map<ShuffleServerInfo, Set<Integer>> serverToPartitions,
String appId,
diff --git a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index 0eed01e..e4461a3 100644
--- a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++ b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -30,6 +30,7 @@
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.IdHelper;
public class ShuffleClientFactory {
@@ -199,7 +200,7 @@
private String basePath;
private int partitionNumPerRange;
private int partitionNum;
- private Roaring64NavigableMap blockIdBitmap;
+ private BlockIdSet blockIdBitmap;
private Roaring64NavigableMap taskIdBitmap;
private List<ShuffleServerInfo> shuffleServerInfoList;
private Configuration hadoopConf;
@@ -245,7 +246,7 @@
return this;
}
- public ReadClientBuilder blockIdBitmap(Roaring64NavigableMap blockIdBitmap) {
+ public ReadClientBuilder blockIdBitmap(BlockIdSet blockIdBitmap) {
this.blockIdBitmap = blockIdBitmap;
return this;
}
@@ -348,7 +349,7 @@
return basePath;
}
- public Roaring64NavigableMap getBlockIdBitmap() {
+ public BlockIdSet getBlockIdBitmap() {
return blockIdBitmap;
}
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index e1aa0f9..64246c4 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -43,6 +43,7 @@
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.IdHelper;
import org.apache.uniffle.common.util.RssUtils;
@@ -58,10 +59,10 @@
private int partitionId;
private ByteBuffer readBuffer;
private ShuffleDataResult sdr;
- private Roaring64NavigableMap blockIdBitmap;
+ private BlockIdSet blockIdBitmap;
private Roaring64NavigableMap taskIdBitmap;
- private Roaring64NavigableMap pendingBlockIds;
- private Roaring64NavigableMap processedBlockIds = Roaring64NavigableMap.bitmapOf();
+ private BlockIdSet pendingBlockIds;
+ private BlockIdSet processedBlockIds = BlockIdSet.empty();
private Queue<BufferSegment> bufferSegmentQueue = Queues.newLinkedBlockingQueue();
private AtomicLong readDataTime = new AtomicLong(0);
private AtomicLong copyTime = new AtomicLong(0);
@@ -188,12 +189,10 @@
}
});
- for (long rid : removeBlockIds) {
- blockIdBitmap.removeLong(rid);
- }
+ blockIdBitmap.removeAll(removeBlockIds.stream());
// copy blockIdBitmap to track all pending blocks
- pendingBlockIds = RssUtils.cloneBitMap(blockIdBitmap);
+ pendingBlockIds = blockIdBitmap.copy();
clientReadHandler = ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
}
@@ -266,16 +265,16 @@
}
// mark block as processed
- processedBlockIds.addLong(bs.getBlockId());
- pendingBlockIds.removeLong(bs.getBlockId());
+ processedBlockIds.add(bs.getBlockId());
+ pendingBlockIds.remove(bs.getBlockId());
// only update the statistics of necessary blocks
clientReadHandler.updateConsumedBlockInfo(bs, false);
break;
}
clientReadHandler.updateConsumedBlockInfo(bs, true);
// mark block as processed
- processedBlockIds.addLong(bs.getBlockId());
- pendingBlockIds.removeLong(bs.getBlockId());
+ processedBlockIds.add(bs.getBlockId());
+ pendingBlockIds.remove(bs.getBlockId());
}
if (bs != null) {
@@ -289,7 +288,7 @@
}
@VisibleForTesting
- protected Roaring64NavigableMap getProcessedBlockIds() {
+ protected BlockIdSet getProcessedBlockIds() {
return processedBlockIds;
}
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index b9e523b..f721594 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -39,7 +39,6 @@
import com.google.common.collect.Sets;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.security.UserGroupInformation;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,6 +91,7 @@
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -768,7 +768,7 @@
}
@Override
- public Roaring64NavigableMap getShuffleResult(
+ public BlockIdSet getShuffleResult(
String clientType,
Set<ShuffleServerInfo> shuffleServerInfoSet,
String appId,
@@ -777,7 +777,7 @@
RssGetShuffleResultRequest request =
new RssGetShuffleResultRequest(appId, shuffleId, partitionId, blockIdLayout);
boolean isSuccessful = false;
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
int successCnt = 0;
for (ShuffleServerInfo ssi : shuffleServerInfoSet) {
try {
@@ -785,8 +785,8 @@
getShuffleServerClient(ssi).getShuffleResult(request);
if (response.getStatusCode() == StatusCode.SUCCESS) {
// merge into blockIds from multiple servers.
- Roaring64NavigableMap blockIdBitmapOfServer = response.getBlockIdBitmap();
- blockIdBitmap.or(blockIdBitmapOfServer);
+ BlockIdSet blockIdBitmapOfServer = response.getBlockIdBitmap();
+ blockIdBitmap.addAll(blockIdBitmapOfServer);
successCnt++;
if (successCnt >= replicaRead) {
isSuccessful = true;
@@ -812,14 +812,14 @@
}
@Override
- public Roaring64NavigableMap getShuffleResultForMultiPart(
+ public BlockIdSet getShuffleResultForMultiPart(
String clientType,
Map<ShuffleServerInfo, Set<Integer>> serverToPartitions,
String appId,
int shuffleId,
Set<Integer> failedPartitions,
PartitionDataReplicaRequirementTracking replicaRequirementTracking) {
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Set<Integer> allRequestedPartitionIds = new HashSet<>();
for (Map.Entry<ShuffleServerInfo, Set<Integer>> entry : serverToPartitions.entrySet()) {
ShuffleServerInfo shuffleServerInfo = entry.getKey();
@@ -838,8 +838,8 @@
getShuffleServerClient(shuffleServerInfo).getShuffleResultForMultiPart(request);
if (response.getStatusCode() == StatusCode.SUCCESS) {
// merge into blockIds from multiple servers.
- Roaring64NavigableMap blockIdBitmapOfServer = response.getBlockIdBitmap();
- blockIdBitmap.or(blockIdBitmapOfServer);
+ BlockIdSet blockIdBitmapOfServer = response.getBlockIdBitmap();
+ blockIdBitmap.addAll(blockIdBitmapOfServer);
for (Integer partitionId : requestPartitions) {
replicaRequirementTracking.markPartitionOfServerSuccessful(
partitionId, shuffleServerInfo);
diff --git a/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java b/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
index dd9ef62..fe3da70 100644
--- a/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
@@ -34,6 +34,7 @@
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.RssUtils;
import static org.apache.uniffle.client.util.ClientUtils.waitUntilDoneOrFail;
@@ -49,14 +50,14 @@
public void testGenerateTaskIdBitMap() {
int partitionId = 1;
BlockIdLayout layout = BlockIdLayout.DEFAULT;
- Roaring64NavigableMap blockIdMap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdMap = BlockIdSet.empty();
int taskSize = 10;
long[] except = new long[taskSize];
for (int i = 0; i < taskSize; i++) {
except[i] = i;
for (int j = 0; j < 100; j++) {
long blockId = layout.getBlockId(j, partitionId, i);
- blockIdMap.addLong(blockId);
+ blockIdMap.add(blockId);
}
}
Roaring64NavigableMap taskIdBitMap =
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index 5d0d4b4..eb4f94f 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.client.impl;
import java.nio.ByteBuffer;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -30,7 +31,6 @@
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
-import org.roaringbitmap.longlong.LongIterator;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.TestUtils;
@@ -43,6 +43,7 @@
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.storage.HadoopTestBase;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
@@ -84,7 +85,7 @@
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 2, 30, 1, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
@@ -99,7 +100,7 @@
readClient.close();
BlockIdLayout layout = BlockIdLayout.DEFAULT;
- blockIdBitmap.addLong(layout.getBlockId(0, 0, layout.maxTaskAttemptId - 1));
+ blockIdBitmap.add(layout.getBlockId(0, 0, layout.maxTaskAttemptId - 1));
taskIdBitmap.addLong(layout.maxTaskAttemptId - 1);
readClient =
baseReadBuilder()
@@ -129,7 +130,7 @@
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler1, 2, 30, 0, 0, expectedData, blockIdBitmap);
writeTestData(writeHandler2, 2, 30, 0, 0, expectedData, blockIdBitmap);
@@ -157,7 +158,7 @@
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ final BlockIdSet blockIdBitmap = BlockIdSet.empty();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler1, 2, 30, 0, 0, expectedData, blockIdBitmap);
writeTestData(writeHandler2, 2, 30, 0, 0, expectedData, blockIdBitmap);
@@ -214,7 +215,7 @@
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
@@ -254,7 +255,7 @@
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
@@ -282,11 +283,11 @@
Map<Long, byte[]> expectedData1 = Maps.newHashMap();
Map<Long, byte[]> expectedData2 = Maps.newHashMap();
- final Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
+ final BlockIdSet blockIdBitmap1 = BlockIdSet.empty();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 10, 30, 0, 0, expectedData1, blockIdBitmap1);
- Roaring64NavigableMap blockIdBitmap2 = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap2 = BlockIdSet.empty();
writeTestData(writeHandler, 10, 30, 0, 0, expectedData2, blockIdBitmap2);
writeTestData(writeHandler, 10, 30, 0, 0, expectedData1, blockIdBitmap1);
@@ -323,7 +324,7 @@
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
@@ -375,7 +376,7 @@
.partitionId(0)
.partitionNumPerRange(2)
.basePath("basePath")
- .blockIdBitmap(Roaring64NavigableMap.bitmapOf())
+ .blockIdBitmap(BlockIdSet.empty())
.taskIdBitmap(Roaring64NavigableMap.bitmapOf())
.build();
assertNull(readClient.readShuffleBlockData());
@@ -390,14 +391,14 @@
BlockIdLayout layout = BlockIdLayout.DEFAULT;
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 5, 30, 0, 0, expectedData, blockIdBitmap);
- Roaring64NavigableMap wrongBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
- LongIterator iter = blockIdBitmap.getLongIterator();
+ BlockIdSet wrongBlockIdBitmap = BlockIdSet.empty();
+ Iterator<Long> iter = blockIdBitmap.stream().iterator();
while (iter.hasNext()) {
BlockId blockId = layout.asBlockId(iter.next());
- wrongBlockIdBitmap.addLong(
+ wrongBlockIdBitmap.add(
layout.getBlockId(blockId.sequenceNo, blockId.partitionId + 1, blockId.taskAttemptId));
}
@@ -425,7 +426,7 @@
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 10, 30, 1, 0, expectedData, blockIdBitmap);
// test with different indexReadLimit to validate result
@@ -497,7 +498,7 @@
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ final BlockIdSet blockIdBitmap = BlockIdSet.empty();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 1);
writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
writeTestData(writeHandler, 5, 30, 1, 2, Maps.newHashMap(), blockIdBitmap);
@@ -539,12 +540,11 @@
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ final BlockIdSet blockIdBitmap = BlockIdSet.empty();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 3);
writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap, layout);
// test case: data generated by speculation task without report result
- writeTestData(
- writeHandler, 5, 30, 1, 1, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf(), layout);
+ writeTestData(writeHandler, 5, 30, 1, 1, Maps.newHashMap(), BlockIdSet.empty(), layout);
// test case: data generated by speculation task with report result
writeTestData(writeHandler, 5, 30, 1, 2, Maps.newHashMap(), blockIdBitmap, layout);
writeTestData(writeHandler, 5, 30, 1, 3, expectedData, blockIdBitmap, layout);
@@ -597,10 +597,10 @@
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ final BlockIdSet blockIdBitmap = BlockIdSet.empty();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 2);
writeDuplicatedData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
- writeTestData(writeHandler, 5, 30, 1, 1, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
+ writeTestData(writeHandler, 5, 30, 1, 1, Maps.newHashMap(), BlockIdSet.empty());
writeTestData(writeHandler, 5, 30, 1, 2, expectedData, blockIdBitmap);
// unexpected taskAttemptId should be filtered
@@ -624,14 +624,14 @@
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ final BlockIdSet blockIdBitmap = BlockIdSet.empty();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
- writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
- writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
+ writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), BlockIdSet.empty());
+ writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), BlockIdSet.empty());
writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
- writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
+ writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), BlockIdSet.empty());
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
baseReadBuilder()
@@ -656,8 +656,8 @@
Map<Long, byte[]> expectedData0 = Maps.newHashMap();
Map<Long, byte[]> expectedData1 = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap0 = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap0 = BlockIdSet.empty();
+ BlockIdSet blockIdBitmap1 = BlockIdSet.empty();
writeTestData(writeHandler0, 2, 30, 0, 0, expectedData0, blockIdBitmap0);
writeTestData(writeHandler1, 2, 30, 1, 1, expectedData1, blockIdBitmap1);
@@ -695,7 +695,7 @@
int partitionId,
long taskAttemptId,
Map<Long, byte[]> expectedData,
- Roaring64NavigableMap blockIdBitmap,
+ BlockIdSet blockIdBitmap,
BlockIdLayout layout)
throws Exception {
List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
@@ -707,7 +707,7 @@
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
expectedData.put(blockId, buf);
- blockIdBitmap.addLong(blockId);
+ blockIdBitmap.add(blockId);
}
writeHandler.write(blocks);
}
@@ -719,7 +719,7 @@
int partitionId,
long taskAttemptId,
Map<Long, byte[]> expectedData,
- Roaring64NavigableMap blockIdBitmap)
+ BlockIdSet blockIdBitmap)
throws Exception {
writeTestData(
writeHandler,
@@ -739,7 +739,7 @@
int partitionId,
long taskAttemptId,
Map<Long, byte[]> expectedData,
- Roaring64NavigableMap blockIdBitmap)
+ BlockIdSet blockIdBitmap)
throws Exception {
BlockIdLayout layout = BlockIdLayout.DEFAULT;
List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
@@ -753,7 +753,7 @@
blocks.add(spb);
blocks.add(spb);
expectedData.put(blockId, buf);
- blockIdBitmap.addLong(blockId);
+ blockIdBitmap.add(blockId);
}
writeHandler.write(blocks);
}
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
index d26e969..74a8ca4 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
@@ -49,6 +49,7 @@
import org.apache.uniffle.common.netty.IOMode;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.RssUtils;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -374,8 +375,7 @@
Set<ShuffleServerInfo> shuffleServerInfoSet =
Sets.newHashSet(new ShuffleServerInfo("id", "host", 0));
- Roaring64NavigableMap result =
- spyClient.getShuffleResult("GRPC", shuffleServerInfoSet, "appId", 1, 2);
+ BlockIdSet result = spyClient.getShuffleResult("GRPC", shuffleServerInfoSet, "appId", 1, 2);
verify(mockShuffleServerClient)
.getShuffleResult(argThat(request -> request.getBlockIdLayout().equals(layout)));
diff --git a/common/src/main/java/org/apache/uniffle/common/util/BlockIdSet.java b/common/src/main/java/org/apache/uniffle/common/util/BlockIdSet.java
new file mode 100644
index 0000000..2847fff
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/BlockIdSet.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.function.LongConsumer;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+
+/** Implementations are thread-safe. */
+public interface BlockIdSet {
+ BlockIdSet add(long blockId);
+
+ default BlockIdSet addAll(BlockIdSet blockIds) {
+ return addAll(blockIds.stream());
+ }
+
+ default BlockIdSet addAll(Stream<Long> blockIds) {
+ synchronized (this) {
+ blockIds.forEach(this::add);
+ }
+ return this;
+ }
+
+ default BlockIdSet addAll(LongStream blockIds) {
+ synchronized (this) {
+ blockIds.forEach(this::add);
+ }
+ return this;
+ }
+
+ BlockIdSet remove(long blockId);
+
+ default BlockIdSet removeAll(BlockIdSet blockIds) {
+ return removeAll(blockIds.stream());
+ }
+
+ default BlockIdSet removeAll(Stream<Long> blockIds) {
+ synchronized (this) {
+ blockIds.forEach(this::remove);
+ }
+ return this;
+ }
+
+ default BlockIdSet removeAll(LongStream blockIds) {
+ synchronized (this) {
+ blockIds.forEach(this::remove);
+ }
+ return this;
+ }
+
+ BlockIdSet retainAll(BlockIdSet blockIds);
+
+ boolean contains(long blockId);
+
+ boolean containsAll(BlockIdSet blockIds);
+
+ int getIntCardinality();
+
+ long getLongCardinality();
+
+ boolean isEmpty();
+
+ void forEach(LongConsumer func);
+
+ LongStream stream();
+
+ BlockIdSet copy();
+
+ byte[] serialize() throws IOException;
+
+ // create new empty instance using default implementation
+ static BlockIdSet empty() {
+ return new RoaringBitmapBlockIdSet();
+ }
+
+ // create new instance from given block ids using default implementation
+ static BlockIdSet of(long... blockIds) {
+ BlockIdSet set = empty();
+ set.addAll(Arrays.stream(blockIds));
+ return set;
+ }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RoaringBitmapBlockIdSet.java b/common/src/main/java/org/apache/uniffle/common/util/RoaringBitmapBlockIdSet.java
new file mode 100644
index 0000000..309c655
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/RoaringBitmapBlockIdSet.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import java.io.IOException;
+import java.util.function.LongConsumer;
+import java.util.stream.LongStream;
+
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+public class RoaringBitmapBlockIdSet implements BlockIdSet {
+ public final Roaring64NavigableMap bitmap;
+
+ public RoaringBitmapBlockIdSet() {
+ this(Roaring64NavigableMap.bitmapOf());
+ }
+
+ public RoaringBitmapBlockIdSet(Roaring64NavigableMap bitmap) {
+ this.bitmap = bitmap;
+ }
+
+ @Override
+ public synchronized BlockIdSet add(long blockId) {
+ bitmap.addLong(blockId);
+ return this;
+ }
+
+ @Override
+ public synchronized BlockIdSet addAll(BlockIdSet blockIds) {
+ if (!(blockIds instanceof RoaringBitmapBlockIdSet)) {
+ throw new UnsupportedOperationException(
+ "Only supported for RoaringBitmapBlockIdSet arguments");
+ }
+ bitmap.or(((RoaringBitmapBlockIdSet) blockIds).bitmap);
+ return this;
+ }
+
+ @Override
+ public synchronized BlockIdSet remove(long blockId) {
+ bitmap.removeLong(blockId);
+ return this;
+ }
+
+ @Override
+ public synchronized BlockIdSet removeAll(BlockIdSet blockIds) {
+ if (!(blockIds instanceof RoaringBitmapBlockIdSet)) {
+ throw new UnsupportedOperationException(
+ "Only supported for RoaringBitmapBlockIdSet arguments");
+ }
+ bitmap.andNot(((RoaringBitmapBlockIdSet) blockIds).bitmap);
+ return this;
+ }
+
+ @Override
+ public synchronized BlockIdSet retainAll(BlockIdSet blockIds) {
+ if (!(blockIds instanceof RoaringBitmapBlockIdSet)) {
+ throw new UnsupportedOperationException(
+ "Only supported for RoaringBitmapBlockIdSet arguments");
+ }
+ bitmap.and(((RoaringBitmapBlockIdSet) blockIds).bitmap);
+ return this;
+ }
+
+ @Override
+ public boolean contains(long blockId) {
+ return bitmap.contains(blockId);
+ }
+
+ @Override
+ public boolean containsAll(BlockIdSet blockIds) {
+ if (!(blockIds instanceof RoaringBitmapBlockIdSet)) {
+ throw new UnsupportedOperationException(
+ "Only supported for RoaringBitmapBlockIdSet arguments");
+ }
+ Roaring64NavigableMap expecteds = ((RoaringBitmapBlockIdSet) blockIds).bitmap;
+
+ // first a quick check: no need for expensive bitwise AND when this is equal to the other BlockIdSet
+ if (this.bitmap.equals(expecteds)) {
+ return true;
+ }
+
+ // bitmaps are not equal, check if all expected bits are contained
+ Roaring64NavigableMap actuals = RssUtils.cloneBitMap(bitmap);
+ actuals.and(expecteds);
+ return actuals.equals(expecteds);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof BlockIdSet)) {
+ return false;
+ }
+
+ BlockIdSet blockIds = (BlockIdSet) other;
+ if (!(blockIds instanceof RoaringBitmapBlockIdSet)) {
+ throw new UnsupportedOperationException(
+ "Only supported for RoaringBitmapBlockIdSet arguments");
+ }
+
+ return bitmap.equals(((RoaringBitmapBlockIdSet) blockIds).bitmap);
+ }
+
+ @Override
+ public int getIntCardinality() {
+ return bitmap.getIntCardinality();
+ }
+
+ @Override
+ public long getLongCardinality() {
+ return bitmap.getLongCardinality();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return bitmap.isEmpty();
+ }
+
+ @Override
+ public synchronized void forEach(LongConsumer func) {
+ bitmap.forEach(func::accept);
+ }
+
+ @Override
+ public LongStream stream() {
+ return bitmap.stream();
+ }
+
+ @Override
+ public synchronized BlockIdSet copy() {
+ return new RoaringBitmapBlockIdSet(RssUtils.cloneBitMap(bitmap));
+ }
+
+ @Override
+ public byte[] serialize() throws IOException {
+ return RssUtils.serializeBitMap(bitmap);
+ }
+
+ @Override
+ public String toString() {
+ return bitmap.toString();
+ }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 5c65f5e..e652e84 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -336,16 +336,13 @@
return hostName.replaceAll("[\\.-]", "_");
}
- public static Map<Integer, Roaring64NavigableMap> generatePartitionToBitmap(
- Roaring64NavigableMap shuffleBitmap,
- int startPartition,
- int endPartition,
- BlockIdLayout layout) {
- Map<Integer, Roaring64NavigableMap> result = Maps.newHashMap();
+ public static Map<Integer, BlockIdSet> generatePartitionToBitmap(
+ BlockIdSet shuffleBitmap, int startPartition, int endPartition, BlockIdLayout layout) {
+ Map<Integer, BlockIdSet> result = Maps.newHashMap();
for (int partitionId = startPartition; partitionId < endPartition; partitionId++) {
- result.computeIfAbsent(partitionId, key -> Roaring64NavigableMap.bitmapOf());
+ result.computeIfAbsent(partitionId, key -> BlockIdSet.empty());
}
- Iterator<Long> it = shuffleBitmap.iterator();
+ Iterator<Long> it = shuffleBitmap.stream().iterator();
while (it.hasNext()) {
Long blockId = it.next();
int partitionId = layout.getPartitionId(blockId);
@@ -373,33 +370,23 @@
}
public static void checkProcessedBlockIds(
- Roaring64NavigableMap blockIdBitmap, Roaring64NavigableMap processedBlockIds) {
+ BlockIdSet blockIdBitmap, BlockIdSet processedBlockIds) {
// processedBlockIds can be a superset of blockIdBitmap,
- // here we check that processedBlockIds has all bits of blockIdBitmap set
- // first a quick check:
- // we only need to do the bitwise AND when blockIdBitmap is not equal to processedBlockIds
- if (!blockIdBitmap.equals(processedBlockIds)) {
- Roaring64NavigableMap cloneBitmap;
- cloneBitmap = RssUtils.cloneBitMap(blockIdBitmap);
- cloneBitmap.and(processedBlockIds);
- if (!blockIdBitmap.equals(cloneBitmap)) {
- throw new RssException(
- "Blocks read inconsistent: expected "
- + blockIdBitmap.getLongCardinality()
- + " blocks, actual "
- + cloneBitmap.getLongCardinality()
- + " blocks");
- }
+ // here we check that processedBlockIds contains all of blockIdBitmap
+ if (!processedBlockIds.containsAll(blockIdBitmap)) {
+ throw new RssException(
+ "Blocks read inconsistent: expected "
+ + blockIdBitmap.getLongCardinality()
+ + " blocks, actual "
+ + processedBlockIds.copy().retainAll(blockIdBitmap).getLongCardinality()
+ + " blocks");
}
}
public static Roaring64NavigableMap generateTaskIdBitMap(
- Roaring64NavigableMap blockIdBitmap, IdHelper idHelper) {
- Iterator<Long> iterator = blockIdBitmap.iterator();
+ BlockIdSet blockIdBitmap, IdHelper idHelper) {
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
- while (iterator.hasNext()) {
- taskIdBitmap.addLong(idHelper.getTaskAttemptId(iterator.next()));
- }
+ blockIdBitmap.forEach(blockId -> taskIdBitmap.addLong(idHelper.getTaskAttemptId(blockId)));
return taskIdBitmap;
}
diff --git a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index d9bfddb..2c6a828 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -237,23 +237,23 @@
@ParameterizedTest
@MethodSource("testBlockIdLayouts")
public void testShuffleBitmapToPartitionBitmap(BlockIdLayout layout) {
- Roaring64NavigableMap partition1Bitmap =
- Roaring64NavigableMap.bitmapOf(
+ BlockIdSet partition1Bitmap =
+ BlockIdSet.of(
layout.getBlockId(0, 0, 0),
layout.getBlockId(1, 0, 0),
layout.getBlockId(0, 0, 1),
layout.getBlockId(1, 0, 1));
- Roaring64NavigableMap partition2Bitmap =
- Roaring64NavigableMap.bitmapOf(
+ BlockIdSet partition2Bitmap =
+ BlockIdSet.of(
layout.getBlockId(0, 1, 0),
layout.getBlockId(1, 1, 0),
layout.getBlockId(0, 1, 1),
layout.getBlockId(1, 1, 1));
- Roaring64NavigableMap shuffleBitmap = Roaring64NavigableMap.bitmapOf();
- shuffleBitmap.or(partition1Bitmap);
- shuffleBitmap.or(partition2Bitmap);
+ BlockIdSet shuffleBitmap = BlockIdSet.empty();
+ shuffleBitmap.addAll(partition1Bitmap);
+ shuffleBitmap.addAll(partition2Bitmap);
assertEquals(8, shuffleBitmap.getLongCardinality());
- Map<Integer, Roaring64NavigableMap> toPartitionBitmap =
+ Map<Integer, BlockIdSet> toPartitionBitmap =
RssUtils.generatePartitionToBitmap(shuffleBitmap, 0, 2, layout);
assertEquals(2, toPartitionBitmap.size());
assertEquals(partition1Bitmap, toPartitionBitmap.get(0));
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
index 4029d1c..40ee1a2 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
@@ -53,6 +53,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
import org.apache.uniffle.server.ShuffleServer;
@@ -166,7 +167,7 @@
String appId = "ap_disk_error_data";
Map<Long, byte[]> expectedData = Maps.newHashMap();
Set<Long> expectedBlock1 = Sets.newHashSet();
- Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap1 = BlockIdSet.empty();
List<ShuffleBlockInfo> blocks1 =
createShuffleBlockList(0, 0, 1, 3, 25, blockIdBitmap1, expectedData);
RssRegisterShuffleRequest rr1 =
@@ -212,7 +213,7 @@
expectedData.clear();
partitionToBlocks.clear();
shuffleToBlocks.clear();
- Roaring64NavigableMap blockIdBitmap2 = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap2 = BlockIdSet.empty();
Set<Long> expectedBlock2 = Sets.newHashSet();
List<ShuffleBlockInfo> blocks2 =
createShuffleBlockList(0, 0, 2, 5, 30, blockIdBitmap2, expectedData);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
index 8a2fe26..c26f015 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HybridStorageFaultToleranceBase.java
@@ -51,6 +51,7 @@
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
@@ -97,7 +98,7 @@
Map<Integer, List<Integer>> map = Maps.newHashMap();
map.put(0, Lists.newArrayList(0));
registerShuffle(appId, map, isNettyMode);
- Roaring64NavigableMap blockBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockBitmap = BlockIdSet.empty();
final List<ShuffleBlockInfo> blocks =
createShuffleBlockList(0, 0, 0, 40, 2 * 1024 * 1024, blockBitmap, expectedData);
makeChaos();
@@ -156,7 +157,7 @@
String appId,
int shuffleId,
int partitionId,
- Roaring64NavigableMap blockBitmap,
+ BlockIdSet blockBitmap,
Roaring64NavigableMap taskBitmap,
Map<Long, byte[]> expectedData,
boolean isNettyMode) {
@@ -186,11 +187,11 @@
.hadoopConf(conf)
.build();
CompressedShuffleBlock csb = readClient.readShuffleBlockData();
- Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet matched = BlockIdSet.empty();
while (csb != null && csb.getByteBuffer() != null) {
for (Map.Entry<Long, byte[]> entry : expectedData.entrySet()) {
if (compareByte(entry.getValue(), csb.getByteBuffer())) {
- matched.addLong(entry.getKey());
+ matched.add(entry.getKey());
break;
}
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 6662a36..bac3d60 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -45,6 +45,7 @@
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
@@ -249,7 +250,7 @@
String testAppId = "rpcFailedTest";
registerShuffleServer(testAppId, 3, 2, 2, true);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
// case1: When only 1 server is failed, the block sending should success
List<ShuffleBlockInfo> blocks =
@@ -264,13 +265,13 @@
Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, fakedShuffleServerInfo2));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
- Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet failedBlockIdBitmap = BlockIdSet.empty();
+ BlockIdSet succBlockIdBitmap = BlockIdSet.empty();
for (Long blockId : result.getSuccessBlockIds()) {
- succBlockIdBitmap.addLong(blockId);
+ succBlockIdBitmap.add(blockId);
}
for (Long blockId : result.getFailedBlockIds()) {
- failedBlockIdBitmap.addLong(blockId);
+ failedBlockIdBitmap.add(blockId);
}
assertEquals(0, failedBlockIdBitmap.getLongCardinality());
assertEquals(blockIdBitmap, succBlockIdBitmap);
@@ -289,7 +290,7 @@
validateResult(readClient, expectedData);
// case2: When 2 servers are failed, the block sending should fail
- blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ blockIdBitmap = BlockIdSet.empty();
blocks =
createShuffleBlockList(
0,
@@ -302,13 +303,13 @@
Lists.newArrayList(
shuffleServerInfo0, fakedShuffleServerInfo1, fakedShuffleServerInfo2));
result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
- failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
- succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ failedBlockIdBitmap = BlockIdSet.empty();
+ succBlockIdBitmap = BlockIdSet.empty();
for (Long blockId : result.getSuccessBlockIds()) {
- succBlockIdBitmap.addLong(blockId);
+ succBlockIdBitmap.add(blockId);
}
for (Long blockId : result.getFailedBlockIds()) {
- failedBlockIdBitmap.addLong(blockId);
+ failedBlockIdBitmap.add(blockId);
}
assertEquals(blockIdBitmap, failedBlockIdBitmap);
assertEquals(0, succBlockIdBitmap.getLongCardinality());
@@ -379,7 +380,7 @@
public void case1() throws Exception {
String testAppId = "case1";
registerShuffleServer(testAppId, 3, 2, 2, true);
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
// only 1 server is timout, the block sending should success
enableTimeout((MockedShuffleServer) grpcShuffleServers.get(2), 500);
@@ -392,7 +393,7 @@
serverToPartitionToBlockIds.put(shuffleServerInfo1, partitionToBlockIds);
serverToPartitionToBlockIds.put(shuffleServerInfo2, partitionToBlockIds);
shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 0, 0L, 1);
- Roaring64NavigableMap report =
+ BlockIdSet report =
shuffleWriteClientImpl.getShuffleResult(
"GRPC",
Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
@@ -414,9 +415,9 @@
expectedData,
Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
- Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet succBlockIdBitmap = BlockIdSet.empty();
for (Long blockId : result.getSuccessBlockIds()) {
- succBlockIdBitmap.addLong(blockId);
+ succBlockIdBitmap.add(blockId);
}
assertEquals(0, result.getFailedBlockIds().size());
assertEquals(blockIdBitmap, succBlockIdBitmap);
@@ -439,7 +440,7 @@
registerShuffleServer(testAppId, 3, 2, 2, true);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
// When 2 servers are timeout, the block sending should fail
enableTimeout((MockedShuffleServer) grpcShuffleServers.get(1), 500);
@@ -455,9 +456,9 @@
expectedData,
Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
- Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet failedBlockIdBitmap = BlockIdSet.empty();
for (Long blockId : result.getFailedBlockIds()) {
- failedBlockIdBitmap.addLong(blockId);
+ failedBlockIdBitmap.add(blockId);
}
assertEquals(blockIdBitmap, failedBlockIdBitmap);
assertEquals(0, result.getSuccessBlockIds().size());
@@ -501,7 +502,7 @@
enableTimeout((MockedShuffleServer) grpcShuffleServers.get(2), 500);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
List<ShuffleBlockInfo> blocks =
createShuffleBlockList(
0,
@@ -513,13 +514,13 @@
expectedData,
Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
- Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet failedBlockIdBitmap = BlockIdSet.empty();
+ BlockIdSet succBlockIdBitmap = BlockIdSet.empty();
for (Long blockId : result.getSuccessBlockIds()) {
- succBlockIdBitmap.addLong(blockId);
+ succBlockIdBitmap.add(blockId);
}
for (Long blockId : result.getFailedBlockIds()) {
- failedBlockIdBitmap.addLong(blockId);
+ failedBlockIdBitmap.add(blockId);
}
assertEquals(blockIdBitmap, succBlockIdBitmap);
assertEquals(0, failedBlockIdBitmap.getLongCardinality());
@@ -533,7 +534,7 @@
shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 0, 0L, 1);
- Roaring64NavigableMap report =
+ BlockIdSet report =
shuffleWriteClientImpl.getShuffleResult(
"GRPC",
Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
@@ -577,7 +578,7 @@
enableTimeout((MockedShuffleServer) grpcShuffleServers.get(2), 500);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
for (int i = 0; i < 5; i++) {
List<ShuffleBlockInfo> blocks =
@@ -613,7 +614,7 @@
registerShuffleServer(testAppId, 3, 2, 2, true);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
final List<ShuffleBlockInfo> blocks =
createShuffleBlockList(
0,
@@ -633,7 +634,7 @@
serverToPartitionToBlockIds.put(shuffleServerInfo1, partitionToBlockIds);
serverToPartitionToBlockIds.put(shuffleServerInfo2, partitionToBlockIds);
shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 0, 0L, 1);
- Roaring64NavigableMap report =
+ BlockIdSet report =
shuffleWriteClientImpl.getShuffleResult(
"GRPC",
Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
@@ -644,9 +645,9 @@
// data read should success
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
- Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet succBlockIdBitmap = BlockIdSet.empty();
for (Long blockId : result.getSuccessBlockIds()) {
- succBlockIdBitmap.addLong(blockId);
+ succBlockIdBitmap.add(blockId);
}
assertEquals(0, result.getFailedBlockIds().size());
assertEquals(blockIdBitmap, succBlockIdBitmap);
@@ -687,9 +688,9 @@
String testAppId = "case6";
registerShuffleServer(testAppId, 3, 2, 2, true);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap0 = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap blockIdBitmap2 = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap0 = BlockIdSet.empty();
+ BlockIdSet blockIdBitmap1 = BlockIdSet.empty();
+ BlockIdSet blockIdBitmap2 = BlockIdSet.empty();
List<ShuffleBlockInfo> partition0 =
createShuffleBlockList(
@@ -753,7 +754,7 @@
registerShuffleServer(testAppId, 3, 2, 2, true);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
// attempt to send data to "all servers", but only the server 0,1 receive data actually
@@ -811,7 +812,7 @@
registerShuffleServer(testAppId, 3, 2, 2, true);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
// attempt to send data to "all servers", but only the server 0,2 receive data actually
@@ -872,7 +873,7 @@
registerShuffleServer(testAppId, 5, 3, 3, true);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
// attempt to send data to "all servers", but only the server 0,1,2 receive data actually
@@ -926,7 +927,7 @@
registerShuffleServer(testAppId, 5, 3, 3, true);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
// attempt to send data to "all servers", but the secondary round is activated due to failures
@@ -980,7 +981,7 @@
registerShuffleServer(testAppId, 5, 4, 2, true);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
// attempt to send data to "all servers", but only the server 0,1,2 receive data actually
@@ -1035,7 +1036,7 @@
registerShuffleServer(testAppId, 3, 2, 2, false);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
for (int i = 0; i < 5; i++) {
@@ -1090,11 +1091,11 @@
Map<Long, byte[]> expectedData,
Roaring64NavigableMap blockIdBitmap) {
CompressedShuffleBlock csb = readClient.readShuffleBlockData();
- Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet matched = BlockIdSet.empty();
while (csb != null && csb.getByteBuffer() != null) {
for (Map.Entry<Long, byte[]> entry : expectedData.entrySet()) {
if (compareByte(entry.getValue(), csb.getByteBuffer())) {
- matched.addLong(entry.getKey());
+ matched.add(entry.getKey());
break;
}
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
index d082ae0..f00102a 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
@@ -43,6 +43,7 @@
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
import org.apache.uniffle.server.MockedGrpcServer;
@@ -152,7 +153,7 @@
String testAppId = "testRpcRetryLogic";
registerShuffleServer(testAppId, 3, 2, 2, true);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
List<ShuffleBlockInfo> blocks =
createShuffleBlockList(
@@ -166,13 +167,13 @@
Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks);
- Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap successfulBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet failedBlockIdBitmap = BlockIdSet.empty();
+ BlockIdSet successfulBlockIdBitmap = BlockIdSet.empty();
for (Long blockId : result.getSuccessBlockIds()) {
- successfulBlockIdBitmap.addLong(blockId);
+ successfulBlockIdBitmap.add(blockId);
}
for (Long blockId : result.getFailedBlockIds()) {
- failedBlockIdBitmap.addLong(blockId);
+ failedBlockIdBitmap.add(blockId);
}
assertEquals(0, failedBlockIdBitmap.getLongCardinality());
assertEquals(blockIdBitmap, successfulBlockIdBitmap);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
index b53df0d..6ce9b56 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
@@ -26,7 +26,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.TestUtils;
import org.apache.uniffle.client.api.ShuffleReadClient;
@@ -41,6 +40,7 @@
import org.apache.uniffle.common.segment.FixedSizeSegmentSplitter;
import org.apache.uniffle.common.segment.SegmentSplitter;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.ChecksumUtils;
public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
@@ -55,7 +55,7 @@
long taskAttemptId,
int blockNum,
int length,
- Roaring64NavigableMap blockIdBitmap,
+ BlockIdSet blockIdBitmap,
Map<Long, byte[]> dataMap,
List<ShuffleServerInfo> shuffleServerInfoList) {
List<ShuffleBlockInfo> shuffleBlockInfoList = Lists.newArrayList();
@@ -65,7 +65,7 @@
int seqno = ATOMIC_INT.getAndIncrement();
long blockId = LAYOUT.getBlockId(seqno, 0, taskAttemptId);
- blockIdBitmap.addLong(blockId);
+ blockIdBitmap.add(blockId);
dataMap.put(blockId, buf);
shuffleBlockInfoList.add(
new ShuffleBlockInfo(
@@ -89,7 +89,7 @@
long taskAttemptId,
int blockNum,
int length,
- Roaring64NavigableMap blockIdBitmap,
+ BlockIdSet blockIdBitmap,
Map<Long, byte[]> dataMap) {
List<ShuffleServerInfo> shuffleServerInfoList =
Lists.newArrayList(new ShuffleServerInfo("id", "host", 0));
@@ -105,9 +105,9 @@
}
public static Map<Integer, List<ShuffleBlockInfo>> createTestData(
- Roaring64NavigableMap[] bitmaps, Map<Long, byte[]> expectedData) {
+ BlockIdSet[] bitmaps, Map<Long, byte[]> expectedData) {
for (int i = 0; i < 4; i++) {
- bitmaps[i] = Roaring64NavigableMap.bitmapOf();
+ bitmaps[i] = BlockIdSet.empty();
}
List<ShuffleBlockInfo> blocks1 =
createShuffleBlockList(0, 0, 0, 3, 25, bitmaps[0], expectedData, mockSSI);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
index 7d5d114..cf487e1 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
@@ -54,6 +54,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
@@ -120,12 +121,12 @@
clientSpecifiedConcurrency);
shuffleServerClient.registerShuffle(rrsr);
- List<Roaring64NavigableMap> bitmaps = new ArrayList<>();
+ List<BlockIdSet> bitmaps = new ArrayList<>();
Map<Long, byte[]> expectedDataList = new HashMap<>();
IntStream.range(0, 20)
.forEach(
x -> {
- Roaring64NavigableMap bitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet bitmap = BlockIdSet.empty();
bitmaps.add(bitmap);
Map<Long, byte[]> expectedData = Maps.newHashMap();
@@ -167,11 +168,11 @@
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT))
: new ShuffleServerInfo(
LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
- Roaring64NavigableMap blocksBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blocksBitmap = BlockIdSet.empty();
bitmaps.stream()
.forEach(
x -> {
- Iterator<Long> iterator = x.iterator();
+ Iterator<Long> iterator = x.stream().iterator();
while (iterator.hasNext()) {
blocksBitmap.add(iterator.next());
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
index 2eee212..44a0f30 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
@@ -48,6 +48,7 @@
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
@@ -131,7 +132,7 @@
(client) -> {
client.registerShuffle(rrsr);
});
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectBlockIds = BlockIdSet.empty();
Map<Long, byte[]> dataMap = Maps.newHashMap();
Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
bitmaps[0] = Roaring64NavigableMap.bitmapOf();
@@ -254,7 +255,7 @@
int shuffleId,
int partitionId,
List<ShuffleServerInfo> shuffleServerInfoList,
- Roaring64NavigableMap expectBlockIds,
+ BlockIdSet expectBlockIds,
StorageType storageType) {
CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest();
request.setStorageType(storageType.name());
@@ -269,7 +270,7 @@
request.setShuffleServerInfoList(shuffleServerInfoList);
request.setHadoopConf(conf);
request.setExpectBlockIds(expectBlockIds);
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet processBlockIds = BlockIdSet.empty();
request.setProcessBlockIds(processBlockIds);
request.setDistributionType(ShuffleDataDistributionType.NORMAL);
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 9b48662..d5070e8 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -72,6 +72,7 @@
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
@@ -277,8 +278,8 @@
req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 1, layout);
RssGetShuffleResultResponse result = grpcShuffleServerClient.getShuffleResult(req);
- Roaring64NavigableMap blockIdBitmap = result.getBlockIdBitmap();
- assertEquals(Roaring64NavigableMap.bitmapOf(), blockIdBitmap);
+ BlockIdSet blockIdBitmap = result.getBlockIdBitmap();
+ assertEquals(BlockIdSet.empty(), blockIdBitmap);
request = new RssReportShuffleResultRequest("shuffleResultTest", 0, 0L, partitionToBlockIds, 1);
RssReportShuffleResultResponse response = grpcShuffleServerClient.reportShuffleResult(request);
@@ -286,21 +287,21 @@
req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 1, layout);
result = grpcShuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
- Roaring64NavigableMap expectedP1 = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectedP1 = BlockIdSet.empty();
addExpectedBlockIds(expectedP1, blockIds1);
assertEquals(expectedP1, blockIdBitmap);
req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 2, layout);
result = grpcShuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
- Roaring64NavigableMap expectedP2 = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectedP2 = BlockIdSet.empty();
addExpectedBlockIds(expectedP2, blockIds2);
assertEquals(expectedP2, blockIdBitmap);
req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 3, layout);
result = grpcShuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
- Roaring64NavigableMap expectedP3 = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectedP3 = BlockIdSet.empty();
addExpectedBlockIds(expectedP3, blockIds3);
assertEquals(expectedP3, blockIdBitmap);
@@ -338,7 +339,7 @@
req = new RssGetShuffleResultRequest("shuffleResultTest", 1, 1, layout);
result = grpcShuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
- assertEquals(Roaring64NavigableMap.bitmapOf(), blockIdBitmap);
+ assertEquals(BlockIdSet.empty(), blockIdBitmap);
// test with bitmapNum > 1
partitionToBlockIds = Maps.newHashMap();
@@ -351,7 +352,7 @@
request = new RssReportShuffleResultRequest("shuffleResultTest", 2, 1L, partitionToBlockIds, 3);
grpcShuffleServerClient.reportShuffleResult(request);
// validate bitmap in shuffleTaskManager
- Roaring64NavigableMap[] bitmaps =
+ BlockIdSet[] bitmaps =
grpcShuffleServers
.get(0)
.getShuffleTaskManager()
@@ -363,21 +364,21 @@
req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 1, layout);
result = grpcShuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
- expectedP1 = Roaring64NavigableMap.bitmapOf();
+ expectedP1 = BlockIdSet.empty();
addExpectedBlockIds(expectedP1, blockIds1);
assertEquals(expectedP1, blockIdBitmap);
req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 2, layout);
result = grpcShuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
- expectedP2 = Roaring64NavigableMap.bitmapOf();
+ expectedP2 = BlockIdSet.empty();
addExpectedBlockIds(expectedP2, blockIds2);
assertEquals(expectedP2, blockIdBitmap);
req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 3, layout);
result = grpcShuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
- expectedP3 = Roaring64NavigableMap.bitmapOf();
+ expectedP3 = BlockIdSet.empty();
addExpectedBlockIds(expectedP3, blockIds3);
assertEquals(expectedP3, blockIdBitmap);
@@ -395,21 +396,21 @@
req = new RssGetShuffleResultRequest("shuffleResultTest", 4, layout.maxPartitionId, layout);
result = grpcShuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
- expectedP1 = Roaring64NavigableMap.bitmapOf();
+ expectedP1 = BlockIdSet.empty();
addExpectedBlockIds(expectedP1, blockIds1);
assertEquals(expectedP1, blockIdBitmap);
req = new RssGetShuffleResultRequest("shuffleResultTest", 4, 2, layout);
result = grpcShuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
- expectedP2 = Roaring64NavigableMap.bitmapOf();
+ expectedP2 = BlockIdSet.empty();
addExpectedBlockIds(expectedP2, blockIds2);
assertEquals(expectedP2, blockIdBitmap);
req = new RssGetShuffleResultRequest("shuffleResultTest", 4, 3, layout);
result = grpcShuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
- expectedP3 = Roaring64NavigableMap.bitmapOf();
+ expectedP3 = BlockIdSet.empty();
addExpectedBlockIds(expectedP3, blockIds3);
assertEquals(expectedP3, blockIdBitmap);
@@ -729,15 +730,15 @@
t2.join();
t3.join();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
for (Long blockId : expectedBlockIds) {
- blockIdBitmap.addLong(blockId);
+ blockIdBitmap.add(blockId);
}
RssGetShuffleResultRequest req =
new RssGetShuffleResultRequest("multipleShuffleResultTest", 1, 1, layout);
RssGetShuffleResultResponse result = grpcShuffleServerClient.getShuffleResult(req);
- Roaring64NavigableMap actualBlockIdBitmap = result.getBlockIdBitmap();
+ BlockIdSet actualBlockIdBitmap = result.getBlockIdBitmap();
assertEquals(blockIdBitmap, actualBlockIdBitmap);
}
@@ -1064,9 +1065,9 @@
return blockIds;
}
- private void addExpectedBlockIds(Roaring64NavigableMap bitmap, List<Long> blockIds) {
+ private void addExpectedBlockIds(BlockIdSet bitmap, List<Long> blockIds) {
for (int i = 0; i < blockIds.size(); i++) {
- bitmap.addLong(blockIds.get(i));
+ bitmap.add(blockIds.get(i));
}
}
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
index 67a9dc0..bf3aeb9 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithHadoopTest.java
@@ -48,6 +48,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
@@ -137,7 +138,7 @@
appId, 0, Lists.newArrayList(new PartitionRange(2, 3)), dataBasePath);
shuffleServerClient.registerShuffle(rrsr);
- Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[4];
+ BlockIdSet[] bitmaps = new BlockIdSet[4];
Map<Long, byte[]> expectedData = Maps.newHashMap();
Map<Integer, List<ShuffleBlockInfo>> dataBlocks = createTestData(bitmaps, expectedData);
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
@@ -248,15 +249,13 @@
}
protected void validateResult(
- ShuffleReadClientImpl readClient,
- Map<Long, byte[]> expectedData,
- Roaring64NavigableMap blockIdBitmap) {
+ ShuffleReadClientImpl readClient, Map<Long, byte[]> expectedData, BlockIdSet blockIdBitmap) {
CompressedShuffleBlock csb = readClient.readShuffleBlockData();
- Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet matched = BlockIdSet.empty();
while (csb != null && csb.getByteBuffer() != null) {
for (Entry<Long, byte[]> entry : expectedData.entrySet()) {
if (compareByte(entry.getValue(), csb.getByteBuffer())) {
- matched.addLong(entry.getKey());
+ matched.add(entry.getKey());
break;
}
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
index 77330fe..3a72068 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
@@ -56,6 +56,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
@@ -191,9 +192,9 @@
}
private Map<Integer, List<ShuffleBlockInfo>> createTestData(
- Roaring64NavigableMap[] bitmaps, Map<Long, byte[]> expectedData) {
+ BlockIdSet[] bitmaps, Map<Long, byte[]> expectedData) {
for (int i = 0; i < 4; i++) {
- bitmaps[i] = Roaring64NavigableMap.bitmapOf();
+ bitmaps[i] = BlockIdSet.empty();
}
List<ShuffleBlockInfo> blocks1 =
ShuffleReadWriteBase.createShuffleBlockList(
@@ -265,7 +266,7 @@
ShuffleDataDistributionType.NORMAL);
shuffleServerClient.registerShuffle(rrsr);
- Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[4];
+ BlockIdSet[] bitmaps = new BlockIdSet[4];
Map<Long, byte[]> expectedData = Maps.newHashMap();
Map<Integer, List<ShuffleBlockInfo>> dataBlocks = createTestData(bitmaps, expectedData);
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
@@ -376,15 +377,13 @@
}
protected void validateResult(
- ShuffleReadClientImpl readClient,
- Map<Long, byte[]> expectedData,
- Roaring64NavigableMap blockIdBitmap) {
+ ShuffleReadClientImpl readClient, Map<Long, byte[]> expectedData, BlockIdSet blockIdBitmap) {
CompressedShuffleBlock csb = readClient.readShuffleBlockData();
- Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet matched = BlockIdSet.empty();
while (csb != null && csb.getByteBuffer() != null) {
for (Map.Entry<Long, byte[]> entry : expectedData.entrySet()) {
if (TestUtils.compareByte(entry.getValue(), csb.getByteBuffer())) {
- matched.addLong(entry.getKey());
+ matched.add(entry.getKey());
break;
}
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
index b0ed66d..2a83538 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfLocalOrderTest.java
@@ -56,6 +56,7 @@
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.segment.LocalOrderSegmentSplitter;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
@@ -127,9 +128,9 @@
}
public static Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> createTestDataWithMultiMapIdx(
- Roaring64NavigableMap[] bitmaps, Map<Long, byte[]> expectedData) {
+ BlockIdSet[] bitmaps, Map<Long, byte[]> expectedData) {
for (int i = 0; i < 4; i++) {
- bitmaps[i] = Roaring64NavigableMap.bitmapOf();
+ bitmaps[i] = BlockIdSet.empty();
}
// key: mapIdx
@@ -192,7 +193,7 @@
/** Write the data to shuffle-servers */
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap[] bitMaps = new Roaring64NavigableMap[4];
+ BlockIdSet[] bitMaps = new BlockIdSet[4];
// Create the shuffle block with the mapIdx
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> partitionToBlocksWithMapIdx =
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
index a77472e..3990ee3 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java
@@ -19,6 +19,7 @@
import java.io.File;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -34,8 +35,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-import org.roaringbitmap.longlong.LongIterator;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient;
@@ -50,6 +49,7 @@
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServer;
@@ -149,7 +149,7 @@
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[4];
+ BlockIdSet[] bitmaps = new BlockIdSet[4];
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = createTestData(bitmaps, expectedData);
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
@@ -207,9 +207,9 @@
assertEquals(expectedBlockIds.size(), matched);
}
- private Set<Long> transBitmapToSet(Roaring64NavigableMap blockIdBitmap) {
+ private Set<Long> transBitmapToSet(BlockIdSet blockIdBitmap) {
Set<Long> blockIds = Sets.newHashSet();
- LongIterator iter = blockIdBitmap.getLongIterator();
+ Iterator<Long> iter = blockIdBitmap.stream().iterator();
while (iter.hasNext()) {
blockIds.add(iter.next());
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java
index f969397..f29deae 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHadoopTest.java
@@ -49,6 +49,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServer;
@@ -166,7 +167,7 @@
Lists.newArrayList(new PartitionRange(0, 0)),
String.format(REMOTE_STORAGE, isNettyMode));
shuffleServerClient.registerShuffle(rrsr);
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectBlockIds = BlockIdSet.empty();
Map<Long, byte[]> dataMap = Maps.newHashMap();
Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
bitmaps[0] = Roaring64NavigableMap.bitmapOf();
@@ -184,7 +185,7 @@
RssSendShuffleDataResponse response = shuffleServerClient.sendShuffleData(rssdr);
assertSame(StatusCode.SUCCESS, response.getStatusCode());
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet processBlockIds = BlockIdSet.empty();
Roaring64NavigableMap exceptTaskIds = Roaring64NavigableMap.bitmapOf(0);
// read the 1-th segment from memory
MemoryClientReadHandler memoryClientReadHandler =
@@ -236,9 +237,9 @@
expectedData.put(blocks.get(2).getBlockId(), ByteBufUtils.readBytes(blocks.get(2).getData()));
ShuffleDataResult sdr = composedClientReadHandler.readShuffleData();
validateResult(expectedData, sdr);
- processBlockIds.addLong(blocks.get(0).getBlockId());
- processBlockIds.addLong(blocks.get(1).getBlockId());
- processBlockIds.addLong(blocks.get(2).getBlockId());
+ processBlockIds.add(blocks.get(0).getBlockId());
+ processBlockIds.add(blocks.get(1).getBlockId());
+ processBlockIds.add(blocks.get(2).getBlockId());
sdr.getBufferSegments()
.forEach(bs -> composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
@@ -261,8 +262,8 @@
expectedData.put(blocks2.get(0).getBlockId(), ByteBufUtils.readBytes(blocks2.get(0).getData()));
expectedData.put(blocks2.get(1).getBlockId(), ByteBufUtils.readBytes(blocks2.get(1).getData()));
validateResult(expectedData, sdr);
- processBlockIds.addLong(blocks2.get(0).getBlockId());
- processBlockIds.addLong(blocks2.get(1).getBlockId());
+ processBlockIds.add(blocks2.get(0).getBlockId());
+ processBlockIds.add(blocks2.get(1).getBlockId());
sdr.getBufferSegments()
.forEach(bs -> composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
@@ -271,7 +272,7 @@
expectedData.clear();
expectedData.put(blocks2.get(2).getBlockId(), ByteBufUtils.readBytes(blocks2.get(2).getData()));
validateResult(expectedData, sdr);
- processBlockIds.addLong(blocks2.get(2).getBlockId());
+ processBlockIds.add(blocks2.get(2).getBlockId());
sdr.getBufferSegments()
.forEach(bs -> composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
@@ -293,8 +294,8 @@
expectedData.put(blocks3.get(0).getBlockId(), ByteBufUtils.readBytes(blocks3.get(0).getData()));
expectedData.put(blocks3.get(1).getBlockId(), ByteBufUtils.readBytes(blocks3.get(1).getData()));
validateResult(expectedData, sdr);
- processBlockIds.addLong(blocks3.get(0).getBlockId());
- processBlockIds.addLong(blocks3.get(1).getBlockId());
+ processBlockIds.add(blocks3.get(0).getBlockId());
+ processBlockIds.add(blocks3.get(1).getBlockId());
sdr.getBufferSegments()
.forEach(bs -> composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
index a2de8f7..a8a2834 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
@@ -47,6 +47,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServer;
@@ -143,10 +144,10 @@
new RssRegisterShuffleRequest(
testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), "");
shuffleServerClient.registerShuffle(rrsr);
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectBlockIds = BlockIdSet.empty();
Map<Long, byte[]> dataMap = Maps.newHashMap();
- Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
- bitmaps[0] = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet[] bitmaps = new BlockIdSet[1];
+ bitmaps[0] = BlockIdSet.empty();
List<ShuffleBlockInfo> blocks =
createShuffleBlockList(shuffleId, partitionId, 0, 3, 25, expectBlockIds, dataMap, mockSSI);
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
@@ -201,7 +202,7 @@
new MemoryClientReadHandler(
testAppId, shuffleId, partitionId, 50, shuffleServerClient, exceptTaskIds);
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet processBlockIds = BlockIdSet.empty();
LocalFileClientReadHandler localFileQuorumClientReadHandler =
new LocalFileClientReadHandler(
testAppId,
@@ -235,8 +236,8 @@
validateResult(expectedData, sdr);
// send data to shuffle server, flush should happen
- processBlockIds.addLong(blocks.get(0).getBlockId());
- processBlockIds.addLong(blocks.get(1).getBlockId());
+ processBlockIds.add(blocks.get(0).getBlockId());
+ processBlockIds.add(blocks.get(1).getBlockId());
List<ShuffleBlockInfo> blocks2 =
createShuffleBlockList(shuffleId, partitionId, 0, 3, 50, expectBlockIds, dataMap, mockSSI);
@@ -269,20 +270,20 @@
expectedData.put(blocks.get(2).getBlockId(), ByteBufUtils.readBytes(blocks.get(2).getData()));
expectedData.put(blocks2.get(0).getBlockId(), ByteBufUtils.readBytes(blocks2.get(0).getData()));
validateResult(expectedData, sdr);
- processBlockIds.addLong(blocks.get(2).getBlockId());
- processBlockIds.addLong(blocks2.get(0).getBlockId());
+ processBlockIds.add(blocks.get(2).getBlockId());
+ processBlockIds.add(blocks2.get(0).getBlockId());
sdr = composedClientReadHandler.readShuffleData();
expectedData.clear();
expectedData.put(blocks2.get(1).getBlockId(), ByteBufUtils.readBytes(blocks2.get(1).getData()));
validateResult(expectedData, sdr);
- processBlockIds.addLong(blocks2.get(1).getBlockId());
+ processBlockIds.add(blocks2.get(1).getBlockId());
sdr = composedClientReadHandler.readShuffleData();
expectedData.clear();
expectedData.put(blocks2.get(2).getBlockId(), ByteBufUtils.readBytes(blocks2.get(2).getData()));
validateResult(expectedData, sdr);
- processBlockIds.addLong(blocks2.get(2).getBlockId());
+ processBlockIds.add(blocks2.get(2).getBlockId());
sdr = composedClientReadHandler.readShuffleData();
assertNull(sdr);
@@ -304,10 +305,10 @@
new RssRegisterShuffleRequest(
testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), "");
shuffleServerClient.registerShuffle(rrsr);
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectBlockIds = BlockIdSet.empty();
Map<Long, byte[]> dataMap = Maps.newHashMap();
- Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
- bitmaps[0] = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet[] bitmaps = new BlockIdSet[1];
+ bitmaps[0] = BlockIdSet.empty();
// create blocks which belong to different tasks
List<ShuffleBlockInfo> blocks = Lists.newArrayList();
for (int i = 0; i < 3; i++) {
@@ -389,10 +390,10 @@
new RssRegisterShuffleRequest(
testAppId, 0, Lists.newArrayList(new PartitionRange(0, 0)), "");
shuffleServerClient.registerShuffle(rrsr);
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectBlockIds = BlockIdSet.empty();
Map<Long, byte[]> dataMap = Maps.newHashMap();
- Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
- bitmaps[0] = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet[] bitmaps = new BlockIdSet[1];
+ bitmaps[0] = BlockIdSet.empty();
List<ShuffleBlockInfo> blocks =
createShuffleBlockList(shuffleId, partitionId, 0, 3, 25, expectBlockIds, dataMap, mockSSI);
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
@@ -407,7 +408,7 @@
assertSame(StatusCode.SUCCESS, response.getStatusCode());
// read the 1-th segment from memory
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet processBlockIds = BlockIdSet.empty();
Roaring64NavigableMap exceptTaskIds = Roaring64NavigableMap.bitmapOf(0);
MemoryClientReadHandler memoryClientReadHandler =
new MemoryClientReadHandler(
@@ -444,9 +445,9 @@
expectedData.put(blocks.get(2).getBlockId(), ByteBufUtils.readBytes(blocks.get(2).getData()));
ShuffleDataResult sdr = composedClientReadHandler.readShuffleData();
validateResult(expectedData, sdr);
- processBlockIds.addLong(blocks.get(0).getBlockId());
- processBlockIds.addLong(blocks.get(1).getBlockId());
- processBlockIds.addLong(blocks.get(2).getBlockId());
+ processBlockIds.add(blocks.get(0).getBlockId());
+ processBlockIds.add(blocks.get(1).getBlockId());
+ processBlockIds.add(blocks.get(2).getBlockId());
// send data to shuffle server, and wait until flush finish
List<ShuffleBlockInfo> blocks2 =
@@ -480,15 +481,15 @@
expectedData.put(blocks2.get(0).getBlockId(), ByteBufUtils.readBytes(blocks2.get(0).getData()));
expectedData.put(blocks2.get(1).getBlockId(), ByteBufUtils.readBytes(blocks2.get(1).getData()));
validateResult(expectedData, sdr);
- processBlockIds.addLong(blocks2.get(0).getBlockId());
- processBlockIds.addLong(blocks2.get(1).getBlockId());
+ processBlockIds.add(blocks2.get(0).getBlockId());
+ processBlockIds.add(blocks2.get(1).getBlockId());
// read the 3-th segment from localFile
sdr = composedClientReadHandler.readShuffleData();
expectedData.clear();
expectedData.put(blocks2.get(2).getBlockId(), ByteBufUtils.readBytes(blocks2.get(2).getData()));
validateResult(expectedData, sdr);
- processBlockIds.addLong(blocks2.get(2).getBlockId());
+ processBlockIds.add(blocks2.get(2).getBlockId());
// all segments are processed
sdr = composedClientReadHandler.readShuffleData();
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index de96781..895660a 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -46,6 +46,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -137,7 +138,7 @@
ShuffleDataDistributionType.NORMAL,
-1);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
// simulator a failed server
ShuffleServerInfo fakeShuffleServerInfo =
@@ -154,13 +155,13 @@
Lists.newArrayList(shuffleServerInfo1, fakeShuffleServerInfo));
SendShuffleDataResult result =
shuffleWriteClientImpl.sendShuffleData(testAppId, blocks, () -> false);
- Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet failedBlockIdBitmap = BlockIdSet.empty();
+ BlockIdSet succBlockIdBitmap = BlockIdSet.empty();
for (Long blockId : result.getFailedBlockIds()) {
- failedBlockIdBitmap.addLong(blockId);
+ failedBlockIdBitmap.add(blockId);
}
for (Long blockId : result.getSuccessBlockIds()) {
- succBlockIdBitmap.addLong(blockId);
+ succBlockIdBitmap.add(blockId);
}
// There will no failed blocks when replica=2
assertEquals(failedBlockIdBitmap.getLongCardinality(), 0);
@@ -178,7 +179,7 @@
serverToPartitionToBlockIds.put(shuffleServerInfo1, ptb);
serverToPartitionToBlockIds.put(fakeShuffleServerInfo, ptb);
shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 0, 0, 2);
- Roaring64NavigableMap report =
+ BlockIdSet report =
shuffleWriteClientImpl.getShuffleResult(
"GRPC", Sets.newHashSet(shuffleServerInfo1, fakeShuffleServerInfo), testAppId, 0, 0);
assertEquals(blockIdBitmap, report);
@@ -218,7 +219,7 @@
serverToPartitionToBlockIds.put(shuffleServerInfo1, partitionToBlocks);
// case1
shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 1, 0, 1);
- Roaring64NavigableMap bitmap =
+ BlockIdSet bitmap =
shuffleWriteClientImpl.getShuffleResult(
"GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 0);
assertTrue(bitmap.isEmpty());
@@ -277,7 +278,7 @@
shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 1, 0, 1);
- Roaring64NavigableMap bitmap =
+ BlockIdSet bitmap =
shuffleWriteClientImpl.getShuffleResult(
"GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 0);
assertTrue(bitmap.isEmpty());
@@ -334,7 +335,7 @@
ShuffleDataDistributionType.NORMAL,
-1);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
List<ShuffleBlockInfo> blocks =
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
index 0401b2c..bfa5215 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
@@ -43,7 +43,6 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
@@ -55,6 +54,7 @@
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.shuffle.manager.RssShuffleManagerBase;
import org.apache.uniffle.storage.util.StorageType;
@@ -225,7 +225,7 @@
.collect(Collectors.toSet());
for (int partitionId : new int[] {0, 1}) {
- Roaring64NavigableMap blockIdLongs =
+ BlockIdSet blockIdLongs =
shuffleWriteClient.getShuffleResult(
ClientType.GRPC.name(), servers, shuffleManager.getAppId(), 0, partitionId);
List<BlockId> blockIds =
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index 11e6054..ae9128d 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.test;
import java.io.File;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@@ -32,7 +33,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-import org.roaringbitmap.longlong.LongIterator;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
@@ -50,7 +50,7 @@
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.BlockIdLayout;
-import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
@@ -157,10 +157,10 @@
BlockIdLayout layout = BlockIdLayout.DEFAULT;
registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
createTestData(testAppId, expectedData, blockIdBitmap, taskIdBitmap, isNettyMode);
- blockIdBitmap.addLong(layout.getBlockId(0, 1, 0));
+ blockIdBitmap.add(layout.getBlockId(0, 1, 0));
ShuffleReadClientImpl readClient;
readClient =
baseReadBuilder(isNettyMode)
@@ -187,7 +187,7 @@
registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
List<ShuffleBlockInfo> blocks =
createShuffleBlockList(0, 0, 0, 2, 30, blockIdBitmap, expectedData, mockSSI);
@@ -214,7 +214,7 @@
registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
List<ShuffleBlockInfo> blocks =
createShuffleBlockList(0, 0, 0, 2, 30, blockIdBitmap, expectedData, mockSSI);
sendTestData(testAppId, blocks, isNettyMode);
@@ -252,13 +252,13 @@
Map<Long, byte[]> expectedData1 = Maps.newHashMap();
Map<Long, byte[]> expectedData2 = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap1 = BlockIdSet.empty();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
List<ShuffleBlockInfo> blocks =
createShuffleBlockList(0, 0, 0, 10, 30, blockIdBitmap1, expectedData1, mockSSI);
sendTestData(testAppId, blocks, isNettyMode);
- Roaring64NavigableMap blockIdBitmap2 = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap2 = BlockIdSet.empty();
blocks = createShuffleBlockList(0, 1, 0, 10, 30, blockIdBitmap2, expectedData2, mockSSI);
sendTestData(testAppId, blocks, isNettyMode);
@@ -298,7 +298,7 @@
.appId(testAppId)
.partitionId(1)
.partitionNumPerRange(2)
- .blockIdBitmap(Roaring64NavigableMap.bitmapOf())
+ .blockIdBitmap(BlockIdSet.empty())
.taskIdBitmap(Roaring64NavigableMap.bitmapOf())
.build();
assertNull(readClient.readShuffleBlockData());
@@ -313,17 +313,17 @@
registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
List<ShuffleBlockInfo> blocks =
createShuffleBlockList(0, 0, 0, 5, 30, blockIdBitmap, expectedData, mockSSI);
sendTestData(testAppId, blocks, isNettyMode);
- Roaring64NavigableMap wrongBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
- LongIterator iter = blockIdBitmap.getLongIterator();
+ BlockIdSet wrongBlockIdBitmap = BlockIdSet.empty();
+ Iterator<Long> iter = blockIdBitmap.stream().iterator();
while (iter.hasNext()) {
BlockId blockId = layout.asBlockId(iter.next());
- wrongBlockIdBitmap.addLong(
+ wrongBlockIdBitmap.add(
layout.getBlockId(blockId.sequenceNo, blockId.partitionId + 1, blockId.taskAttemptId));
}
@@ -349,7 +349,7 @@
registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 1);
List<ShuffleBlockInfo> blocks =
@@ -382,24 +382,20 @@
registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 3);
List<ShuffleBlockInfo> blocks =
createShuffleBlockList(0, 0, 0, 5, 30, blockIdBitmap, expectedData, mockSSI);
sendTestData(testAppId, blocks, isNettyMode);
// test case: data generated by speculation task without report result
- blocks =
- createShuffleBlockList(
- 0, 0, 1, 5, 30, Roaring64NavigableMap.bitmapOf(), Maps.newHashMap(), mockSSI);
+ blocks = createShuffleBlockList(0, 0, 1, 5, 30, BlockIdSet.empty(), Maps.newHashMap(), mockSSI);
sendTestData(testAppId, blocks, isNettyMode);
// test case: data generated by speculation task with report result
blocks = createShuffleBlockList(0, 0, 2, 5, 30, blockIdBitmap, Maps.newHashMap(), mockSSI);
sendTestData(testAppId, blocks, isNettyMode);
- blocks =
- createShuffleBlockList(
- 0, 0, 3, 5, 30, Roaring64NavigableMap.bitmapOf(), Maps.newHashMap(), mockSSI);
+ blocks = createShuffleBlockList(0, 0, 3, 5, 30, BlockIdSet.empty(), Maps.newHashMap(), mockSSI);
sendTestData(testAppId, blocks, isNettyMode);
// unexpected taskAttemptId should be filtered
@@ -421,13 +417,13 @@
String testAppId = "localReadTest9";
registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet blockIdBitmap = BlockIdSet.empty();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
List<ShuffleBlockInfo> blocks;
createTestData(testAppId, expectedData, blockIdBitmap, taskIdBitmap, isNettyMode);
- Roaring64NavigableMap beforeAdded = RssUtils.cloneBitMap(blockIdBitmap);
+ BlockIdSet beforeAdded = blockIdBitmap.copy();
// write data by another task, read data again, the cache for index file should be updated
blocks = createShuffleBlockList(0, 0, 1, 3, 25, blockIdBitmap, Maps.newHashMap(), mockSSI);
sendTestData(testAppId, blocks, isNettyMode);
@@ -467,8 +463,8 @@
registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap expectedBlockIds = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap unexpectedBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectedBlockIds = BlockIdSet.empty();
+ BlockIdSet unexpectedBlockIds = BlockIdSet.empty();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 1);
// send some expected data
List<ShuffleBlockInfo> blocks =
@@ -528,7 +524,7 @@
private void createTestData(
String testAppId,
Map<Long, byte[]> expectedData,
- Roaring64NavigableMap blockIdBitmap,
+ BlockIdSet blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
boolean isNettyMode) {
List<ShuffleBlockInfo> blocks =
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleResultResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleResultResponse.java
index aca33aa..5003071 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleResultResponse.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetShuffleResultResponse.java
@@ -19,24 +19,24 @@
import java.io.IOException;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
-
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
+import org.apache.uniffle.common.util.RoaringBitmapBlockIdSet;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.proto.RssProtos;
public class RssGetShuffleResultResponse extends ClientResponse {
- private Roaring64NavigableMap blockIdBitmap;
+ private BlockIdSet blockIdBitmap;
public RssGetShuffleResultResponse(StatusCode statusCode, byte[] serializedBitmap)
throws IOException {
super(statusCode);
- blockIdBitmap = RssUtils.deserializeBitMap(serializedBitmap);
+ blockIdBitmap = new RoaringBitmapBlockIdSet(RssUtils.deserializeBitMap(serializedBitmap));
}
- public Roaring64NavigableMap getBlockIdBitmap() {
+ public BlockIdSet getBlockIdBitmap() {
return blockIdBitmap;
}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 9237884..4513ea8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -27,13 +27,13 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.server.flush.EventDiscardException;
@@ -58,8 +58,7 @@
private final ShuffleServerConf shuffleServerConf;
private Configuration hadoopConf;
// appId -> shuffleId -> committed shuffle blockIds
- private Map<String, Map<Integer, Roaring64NavigableMap>> committedBlockIds =
- JavaUtils.newConcurrentMap();
+ private Map<String, Map<Integer, BlockIdSet>> committedBlockIds = JavaUtils.newConcurrentMap();
private final int retryMax;
private final StorageManager storageManager;
@@ -209,23 +208,19 @@
return;
}
committedBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
- Map<Integer, Roaring64NavigableMap> shuffleToBlockIds = committedBlockIds.get(appId);
- shuffleToBlockIds.computeIfAbsent(shuffleId, key -> Roaring64NavigableMap.bitmapOf());
- Roaring64NavigableMap bitmap = shuffleToBlockIds.get(shuffleId);
- synchronized (bitmap) {
- for (ShufflePartitionedBlock spb : blocks) {
- bitmap.addLong(spb.getBlockId());
- }
- }
+ Map<Integer, BlockIdSet> shuffleToBlockIds = committedBlockIds.get(appId);
+ shuffleToBlockIds.computeIfAbsent(shuffleId, key -> BlockIdSet.empty());
+ BlockIdSet blockIds = shuffleToBlockIds.get(shuffleId);
+ blockIds.addAll(blocks.stream().mapToLong(ShufflePartitionedBlock::getBlockId));
}
- public Roaring64NavigableMap getCommittedBlockIds(String appId, Integer shuffleId) {
- Map<Integer, Roaring64NavigableMap> shuffleIdToBlockIds = committedBlockIds.get(appId);
+ public BlockIdSet getCommittedBlockIds(String appId, Integer shuffleId) {
+ Map<Integer, BlockIdSet> shuffleIdToBlockIds = committedBlockIds.get(appId);
if (shuffleIdToBlockIds == null) {
LOG.warn("Unexpected value when getCommittedBlockIds for appId[" + appId + "]");
- return Roaring64NavigableMap.bitmapOf();
+ return BlockIdSet.empty();
}
- Roaring64NavigableMap blockIds = shuffleIdToBlockIds.get(shuffleId);
+ BlockIdSet blockIds = shuffleIdToBlockIds.get(shuffleId);
if (blockIds == null) {
LOG.warn(
"Unexpected value when getCommittedBlockIds for appId["
@@ -233,7 +228,7 @@
+ "], shuffleId["
+ shuffleId
+ "]");
- return Roaring64NavigableMap.bitmapOf();
+ return BlockIdSet.empty();
}
return blockIds;
}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index f45b1be..42efa35 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -25,11 +25,11 @@
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Sets;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.JavaUtils;
/**
@@ -46,7 +46,7 @@
private Map<Integer, Object> commitLocks;
/** shuffleId -> blockIds */
- private Map<Integer, Roaring64NavigableMap> cachedBlockIds;
+ private Map<Integer, BlockIdSet> cachedBlockIds;
private AtomicReference<String> user;
@@ -93,7 +93,7 @@
return commitLocks;
}
- public Map<Integer, Roaring64NavigableMap> getCachedBlockIds() {
+ public Map<Integer, BlockIdSet> getCachedBlockIds() {
return cachedBlockIds;
}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 87e5a5e..c7f1c4a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -63,9 +63,9 @@
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
-import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
@@ -106,7 +106,7 @@
// merge different blockId of partition to one bitmap can reduce memory cost,
// but when get blockId, performance will degrade a little which can be optimized by client
// configuration
- private Map<String, Map<Integer, Roaring64NavigableMap[]>> partitionsToBlockIds;
+ private Map<String, Map<Integer, BlockIdSet[]>> partitionsToBlockIds;
private final ShuffleBufferManager shuffleBufferManager;
private Map<String, ShuffleTaskInfo> shuffleTaskInfos = JavaUtils.newConcurrentMap();
private Map<Long, PreAllocatedBufferInfo> requireBufferIds = JavaUtils.newConcurrentMap();
@@ -330,8 +330,8 @@
public StatusCode commitShuffle(String appId, int shuffleId) throws Exception {
long start = System.currentTimeMillis();
refreshAppId(appId);
- Roaring64NavigableMap cachedBlockIds = getCachedBlockIds(appId, shuffleId);
- Roaring64NavigableMap cloneBlockIds;
+ BlockIdSet cachedBlockIds = getCachedBlockIds(appId, shuffleId);
+ BlockIdSet cloneBlockIds;
ShuffleTaskInfo shuffleTaskInfo =
shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo(appId));
Object lock = shuffleTaskInfo.getCommitLocks().computeIfAbsent(shuffleId, x -> new Object());
@@ -340,20 +340,17 @@
if (System.currentTimeMillis() - start > commitTimeout) {
throw new RssException("Shuffle data commit timeout for " + commitTimeout + " ms");
}
- synchronized (cachedBlockIds) {
- cloneBlockIds = RssUtils.cloneBitMap(cachedBlockIds);
- }
+ cloneBlockIds = cachedBlockIds.copy();
long expectedCommitted = cloneBlockIds.getLongCardinality();
shuffleBufferManager.commitShuffleTask(appId, shuffleId);
- Roaring64NavigableMap committedBlockIds;
- Roaring64NavigableMap cloneCommittedBlockIds;
+ BlockIdSet committedBlockIds;
+ BlockIdSet cloneCommittedBlockIds;
long checkInterval = 1000L;
while (true) {
committedBlockIds = shuffleFlushManager.getCommittedBlockIds(appId, shuffleId);
- synchronized (committedBlockIds) {
- cloneCommittedBlockIds = RssUtils.cloneBitMap(committedBlockIds);
- }
- cloneBlockIds.andNot(cloneCommittedBlockIds);
+ // create thread-safe copy, then remove those block ids
+ cloneCommittedBlockIds = committedBlockIds.copy();
+ cloneBlockIds.removeAll(cloneCommittedBlockIds);
if (cloneBlockIds.isEmpty()) {
break;
}
@@ -390,20 +387,20 @@
public void addFinishedBlockIds(
String appId, Integer shuffleId, Map<Integer, long[]> partitionToBlockIds, int bitmapNum) {
refreshAppId(appId);
- Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
+ Map<Integer, BlockIdSet[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
if (shuffleIdToPartitions == null) {
throw new RssException("appId[" + appId + "] is expired!");
}
shuffleIdToPartitions.computeIfAbsent(
shuffleId,
key -> {
- Roaring64NavigableMap[] blockIds = new Roaring64NavigableMap[bitmapNum];
+ BlockIdSet[] blockIds = new BlockIdSet[bitmapNum];
for (int i = 0; i < bitmapNum; i++) {
- blockIds[i] = Roaring64NavigableMap.bitmapOf();
+ blockIds[i] = BlockIdSet.empty();
}
return blockIds;
});
- Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
+ BlockIdSet[] blockIds = shuffleIdToPartitions.get(shuffleId);
if (blockIds.length != bitmapNum) {
throw new InvalidRequestException(
"Request expects "
@@ -415,12 +412,8 @@
for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
Integer partitionId = entry.getKey();
- Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum];
- synchronized (bitmap) {
- for (long blockId : entry.getValue()) {
- bitmap.addLong(blockId);
- }
- }
+ BlockIdSet bitmap = blockIds[partitionId % bitmapNum];
+ bitmap.addAll(Arrays.stream(entry.getValue()));
}
}
@@ -444,28 +437,22 @@
}
ShuffleTaskInfo shuffleTaskInfo =
shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo(appId));
- Roaring64NavigableMap bitmap =
- shuffleTaskInfo
- .getCachedBlockIds()
- .computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf());
+ BlockIdSet blockIds =
+ shuffleTaskInfo.getCachedBlockIds().computeIfAbsent(shuffleId, x -> BlockIdSet.empty());
long size = 0L;
- synchronized (bitmap) {
- for (ShufflePartitionedBlock spb : spbs) {
- bitmap.addLong(spb.getBlockId());
- size += spb.getSize();
- }
- }
+ blockIds.addAll(Arrays.stream(spbs).mapToLong(ShufflePartitionedBlock::getBlockId));
+ size += Arrays.stream(spbs).mapToLong(ShufflePartitionedBlock::getSize).sum();
long partitionSize = shuffleTaskInfo.addPartitionDataSize(shuffleId, partitionId, size);
if (shuffleBufferManager.isHugePartition(partitionSize)) {
shuffleTaskInfo.markHugePartition(shuffleId, partitionId);
}
}
- public Roaring64NavigableMap getCachedBlockIds(String appId, int shuffleId) {
- Map<Integer, Roaring64NavigableMap> shuffleIdToBlockIds =
+ public BlockIdSet getCachedBlockIds(String appId, int shuffleId) {
+ Map<Integer, BlockIdSet> shuffleIdToBlockIds =
shuffleTaskInfos.getOrDefault(appId, new ShuffleTaskInfo(appId)).getCachedBlockIds();
- Roaring64NavigableMap blockIds = shuffleIdToBlockIds.get(shuffleId);
+ BlockIdSet blockIds = shuffleIdToBlockIds.get(shuffleId);
if (blockIds == null) {
LOG.warn(
"Unexpected value when getCachedBlockIds for appId["
@@ -473,7 +460,7 @@
+ "], shuffleId["
+ shuffleId
+ "]");
- return Roaring64NavigableMap.bitmapOf();
+ return BlockIdSet.empty();
}
return blockIds;
}
@@ -551,12 +538,12 @@
storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
}
}
- Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
+ Map<Integer, BlockIdSet[]> shuffleIdToPartitions = partitionsToBlockIds.get(appId);
if (shuffleIdToPartitions == null) {
return null;
}
- Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
+ BlockIdSet[] blockIds = shuffleIdToPartitions.get(shuffleId);
if (blockIds == null) {
return new byte[] {};
}
@@ -571,29 +558,29 @@
}
}
- Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet res = BlockIdSet.empty();
for (Map.Entry<Integer, Set<Integer>> entry : bitmapIndexToPartitions.entrySet()) {
Set<Integer> requestPartitions = entry.getValue();
- Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
- getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
+ BlockIdSet set = blockIds[entry.getKey()];
+ getBlockIdsByPartitionId(requestPartitions, set, res, blockIdLayout);
}
- return RssUtils.serializeBitMap(res);
+ return res.serialize();
}
// filter the specific partition blockId in the bitmap to the resultBitmap
- protected Roaring64NavigableMap getBlockIdsByPartitionId(
+ protected BlockIdSet getBlockIdsByPartitionId(
Set<Integer> requestPartitions,
- Roaring64NavigableMap bitmap,
- Roaring64NavigableMap resultBitmap,
+ BlockIdSet blockIds,
+ BlockIdSet result,
BlockIdLayout blockIdLayout) {
- bitmap.forEach(
+ blockIds.forEach(
blockId -> {
int partitionId = blockIdLayout.getPartitionId(blockId);
if (requestPartitions.contains(partitionId)) {
- resultBitmap.addLong(blockId);
+ result.add(blockId);
}
});
- return resultBitmap;
+ return result;
}
public ShuffleDataResult getInMemoryShuffleData(
@@ -857,7 +844,7 @@
}
@VisibleForTesting
- public Map<String, Map<Integer, Roaring64NavigableMap[]>> getPartitionsToBlockIds() {
+ public Map<String, Map<Integer, BlockIdSet[]>> getPartitionsToBlockIds() {
return partitionsToBlockIds;
}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 2b21445..51c5ddf 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -45,13 +45,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
@@ -670,11 +670,11 @@
List<ShufflePartitionedBlock> blocks,
int partitionNumPerRange,
String basePath) {
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectBlockIds = BlockIdSet.empty();
+ BlockIdSet processBlockIds = BlockIdSet.empty();
Set<Long> remainIds = Sets.newHashSet();
for (ShufflePartitionedBlock spb : blocks) {
- expectBlockIds.addLong(spb.getBlockId());
+ expectBlockIds.add(spb.getBlockId());
remainIds.add(spb.getBlockId());
}
HadoopClientReadHandler handler =
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 75c49cd..008192f 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -55,6 +55,7 @@
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
@@ -769,44 +770,38 @@
ShuffleServerConf conf = new ShuffleServerConf();
ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, null, null, null);
- Roaring64NavigableMap expectedBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectedBlockIds = BlockIdSet.empty();
int expectedPartitionId = 5;
- Roaring64NavigableMap bitmapBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet bitmapBlockIds = BlockIdSet.empty();
for (int taskId = 1; taskId < 10; taskId++) {
for (int partitionId = 1; partitionId < 10; partitionId++) {
for (int i = 0; i < 2; i++) {
long blockId = layout.getBlockId(i, partitionId, taskId);
- bitmapBlockIds.addLong(blockId);
+ bitmapBlockIds.add(blockId);
if (partitionId == expectedPartitionId) {
- expectedBlockIds.addLong(blockId);
+ expectedBlockIds.add(blockId);
}
}
}
}
- Roaring64NavigableMap resultBlockIds =
+ BlockIdSet resultBlockIds =
shuffleTaskManager.getBlockIdsByPartitionId(
- Sets.newHashSet(expectedPartitionId),
- bitmapBlockIds,
- Roaring64NavigableMap.bitmapOf(),
- layout);
+ Sets.newHashSet(expectedPartitionId), bitmapBlockIds, BlockIdSet.empty(), layout);
assertEquals(expectedBlockIds, resultBlockIds);
- bitmapBlockIds.addLong(layout.getBlockId(0, 0, 0));
+ bitmapBlockIds.add(layout.getBlockId(0, 0, 0));
resultBlockIds =
shuffleTaskManager.getBlockIdsByPartitionId(
- Sets.newHashSet(0), bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout);
- assertEquals(Roaring64NavigableMap.bitmapOf(0L), resultBlockIds);
+ Sets.newHashSet(0), bitmapBlockIds, BlockIdSet.empty(), layout);
+ assertEquals(BlockIdSet.of(0L), resultBlockIds);
long expectedBlockId =
layout.getBlockId(layout.maxSequenceNo, layout.maxPartitionId, layout.maxTaskAttemptId);
- bitmapBlockIds.addLong(expectedBlockId);
+ bitmapBlockIds.add(expectedBlockId);
resultBlockIds =
shuffleTaskManager.getBlockIdsByPartitionId(
- Sets.newHashSet(layout.maxPartitionId),
- bitmapBlockIds,
- Roaring64NavigableMap.bitmapOf(),
- layout);
- assertEquals(Roaring64NavigableMap.bitmapOf(expectedBlockId), resultBlockIds);
+ Sets.newHashSet(layout.maxPartitionId), bitmapBlockIds, BlockIdSet.empty(), layout);
+ assertEquals(BlockIdSet.of(expectedBlockId), resultBlockIds);
}
@Test
@@ -815,17 +810,17 @@
ShuffleServerConf conf = new ShuffleServerConf();
ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, null, null, null);
- Roaring64NavigableMap expectedBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectedBlockIds = BlockIdSet.empty();
int startPartition = 3;
int endPartition = 5;
- Roaring64NavigableMap bitmapBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet bitmapBlockIds = BlockIdSet.empty();
for (int taskId = 1; taskId < 10; taskId++) {
for (int partitionId = 1; partitionId < 10; partitionId++) {
for (int i = 0; i < 2; i++) {
long blockId = layout.getBlockId(i, partitionId, taskId);
- bitmapBlockIds.addLong(blockId);
+ bitmapBlockIds.add(blockId);
if (partitionId >= startPartition && partitionId <= endPartition) {
- expectedBlockIds.addLong(blockId);
+ expectedBlockIds.add(blockId);
}
}
}
@@ -839,14 +834,14 @@
}
}
- Roaring64NavigableMap resultBlockIds =
+ BlockIdSet resultBlockIds =
shuffleTaskManager.getBlockIdsByPartitionId(
- requestPartitions, bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout);
+ requestPartitions, bitmapBlockIds, BlockIdSet.empty(), layout);
assertEquals(expectedBlockIds, resultBlockIds);
assertEquals(
bitmapBlockIds,
shuffleTaskManager.getBlockIdsByPartitionId(
- allPartitions, bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout));
+ allPartitions, bitmapBlockIds, BlockIdSet.empty(), layout));
}
@Test
@@ -1101,11 +1096,11 @@
int partitionId,
List<ShufflePartitionedBlock> blocks,
String basePath) {
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectBlockIds = BlockIdSet.empty();
+ BlockIdSet processBlockIds = BlockIdSet.empty();
Set<Long> remainIds = Sets.newHashSet();
for (ShufflePartitionedBlock spb : blocks) {
- expectBlockIds.addLong(spb.getBlockId());
+ expectBlockIds.add(spb.getBlockId());
remainIds.add(spb.getBlockId());
}
HadoopClientReadHandler handler =
diff --git a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index 4edca3b..4fe7dbd 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -29,6 +29,7 @@
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
@@ -123,8 +124,8 @@
.getShuffleServerClient(request.getClientType().name(), ssi, request.getClientConf());
Roaring64NavigableMap expectTaskIds = null;
if (request.isExpectedTaskIdsBitmapFilterEnable()) {
- Roaring64NavigableMap realExceptBlockIds = RssUtils.cloneBitMap(request.getExpectBlockIds());
- realExceptBlockIds.xor(request.getProcessBlockIds());
+ BlockIdSet realExceptBlockIds = request.getExpectBlockIds().copy();
+ realExceptBlockIds.removeAll(request.getProcessBlockIds());
expectTaskIds = RssUtils.generateTaskIdBitMap(realExceptBlockIds, request.getIdHelper());
}
ClientReadHandler memoryClientReadHandler =
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
index 220e029..1ac3590 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java
@@ -24,11 +24,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.segment.SegmentSplitterFactory;
+import org.apache.uniffle.common.util.BlockIdSet;
public abstract class DataSkippableReadHandler extends AbstractClientReadHandler {
private static final Logger LOG = LoggerFactory.getLogger(DataSkippableReadHandler.class);
@@ -36,8 +38,8 @@
protected List<ShuffleDataSegment> shuffleDataSegments = Lists.newArrayList();
protected int segmentIndex = 0;
- protected Roaring64NavigableMap expectBlockIds;
- protected Roaring64NavigableMap processBlockIds;
+ protected BlockIdSet expectBlockIds;
+ protected BlockIdSet processBlockIds;
protected ShuffleDataDistributionType distributionType;
protected Roaring64NavigableMap expectTaskIds;
@@ -47,8 +49,8 @@
int shuffleId,
int partitionId,
int readBufferSize,
- Roaring64NavigableMap expectBlockIds,
- Roaring64NavigableMap processBlockIds,
+ BlockIdSet expectBlockIds,
+ BlockIdSet processBlockIds,
ShuffleDataDistributionType distributionType,
Roaring64NavigableMap expectTaskIds) {
this.appId = appId;
@@ -87,14 +89,14 @@
ShuffleDataResult result = null;
while (segmentIndex < shuffleDataSegments.size()) {
ShuffleDataSegment segment = shuffleDataSegments.get(segmentIndex);
- Roaring64NavigableMap blocksOfSegment = Roaring64NavigableMap.bitmapOf();
- segment.getBufferSegments().forEach(block -> blocksOfSegment.addLong(block.getBlockId()));
+ BlockIdSet blocksOfSegment = BlockIdSet.empty();
+ blocksOfSegment.addAll(
+ segment.getBufferSegments().stream().mapToLong(BufferSegment::getBlockId));
// skip unexpected blockIds
- blocksOfSegment.and(expectBlockIds);
+ blocksOfSegment.retainAll(expectBlockIds);
if (!blocksOfSegment.isEmpty()) {
// skip processed blockIds
- blocksOfSegment.or(processBlockIds);
- blocksOfSegment.xor(processBlockIds);
+ blocksOfSegment.removeAll(processBlockIds);
if (!blocksOfSegment.isEmpty()) {
result = readShuffleData(segment);
segmentIndex++;
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java
index 1cf636e..d745bb7 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java
@@ -35,6 +35,7 @@
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -46,8 +47,8 @@
protected final int partitionNum;
protected final int readBufferSize;
private final String shuffleServerId;
- protected Roaring64NavigableMap expectBlockIds;
- protected Roaring64NavigableMap processBlockIds;
+ protected BlockIdSet expectBlockIds;
+ protected BlockIdSet processBlockIds;
protected final String storageBasePath;
protected final Configuration hadoopConf;
protected final List<HadoopShuffleReadHandler> readHandlers = Lists.newArrayList();
@@ -64,8 +65,8 @@
int partitionNumPerRange,
int partitionNum,
int readBufferSize,
- Roaring64NavigableMap expectBlockIds,
- Roaring64NavigableMap processBlockIds,
+ BlockIdSet expectBlockIds,
+ BlockIdSet processBlockIds,
String storageBasePath,
Configuration hadoopConf,
ShuffleDataDistributionType distributionType,
@@ -98,8 +99,8 @@
int partitionNumPerRange,
int partitionNum,
int readBufferSize,
- Roaring64NavigableMap expectBlockIds,
- Roaring64NavigableMap processBlockIds,
+ BlockIdSet expectBlockIds,
+ BlockIdSet processBlockIds,
String storageBasePath,
Configuration hadoopConf) {
this(
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandler.java
index c7af921..37ca5ba 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandler.java
@@ -31,6 +31,7 @@
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -52,8 +53,8 @@
int partitionId,
String filePrefix,
int readBufferSize,
- Roaring64NavigableMap expectBlockIds,
- Roaring64NavigableMap processBlockIds,
+ BlockIdSet expectBlockIds,
+ BlockIdSet processBlockIds,
Configuration conf,
ShuffleDataDistributionType distributionType,
Roaring64NavigableMap expectTaskIds,
@@ -83,8 +84,8 @@
int partitionId,
String filePrefix,
int readBufferSize,
- Roaring64NavigableMap expectBlockIds,
- Roaring64NavigableMap processBlockIds,
+ BlockIdSet expectBlockIds,
+ BlockIdSet processBlockIds,
Configuration conf)
throws Exception {
this(
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
index 2b5ea8f..881f312 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
@@ -32,6 +32,7 @@
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
+import org.apache.uniffle.common.util.BlockIdSet;
public class LocalFileClientReadHandler extends DataSkippableReadHandler {
private static final Logger LOG = LoggerFactory.getLogger(LocalFileClientReadHandler.class);
@@ -49,8 +50,8 @@
int partitionNumPerRange,
int partitionNum,
int readBufferSize,
- Roaring64NavigableMap expectBlockIds,
- Roaring64NavigableMap processBlockIds,
+ BlockIdSet expectBlockIds,
+ BlockIdSet processBlockIds,
ShuffleServerClient shuffleServerClient,
ShuffleDataDistributionType distributionType,
Roaring64NavigableMap expectTaskIds,
@@ -81,8 +82,8 @@
int partitionNumPerRange,
int partitionNum,
int readBufferSize,
- Roaring64NavigableMap expectBlockIds,
- Roaring64NavigableMap processBlockIds,
+ BlockIdSet expectBlockIds,
+ BlockIdSet processBlockIds,
ShuffleServerClient shuffleServerClient) {
this(
appId,
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
index c3e2200..750c5e3 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
@@ -19,7 +19,6 @@
import java.util.List;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,6 +26,7 @@
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
@@ -36,16 +36,16 @@
private final List<ClientReadHandler> handlers;
private final List<ShuffleServerInfo> shuffleServerInfos;
- private final Roaring64NavigableMap blockIdBitmap;
- private final Roaring64NavigableMap processedBlockIds;
+ private final BlockIdSet blockIdBitmap;
+ private final BlockIdSet processedBlockIds;
private int readHandlerIndex;
public MultiReplicaClientReadHandler(
List<ClientReadHandler> handlers,
List<ShuffleServerInfo> shuffleServerInfos,
- Roaring64NavigableMap blockIdBitmap,
- Roaring64NavigableMap processedBlockIds) {
+ BlockIdSet blockIdBitmap,
+ BlockIdSet processedBlockIds) {
this.handlers = handlers;
this.blockIdBitmap = blockIdBitmap;
this.processedBlockIds = processedBlockIds;
diff --git a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
index 9b73dc8..501fa4f 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
@@ -27,6 +27,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.IdHelper;
public class CreateShuffleReadHandlerRequest {
@@ -45,8 +46,8 @@
private RssBaseConf rssBaseConf;
private Configuration hadoopConf;
private List<ShuffleServerInfo> shuffleServerInfoList;
- private Roaring64NavigableMap expectBlockIds;
- private Roaring64NavigableMap processBlockIds;
+ private BlockIdSet expectBlockIds;
+ private BlockIdSet processBlockIds;
private ShuffleDataDistributionType distributionType;
private Roaring64NavigableMap expectTaskIds;
private boolean expectedTaskIdsBitmapFilterEnable;
@@ -171,19 +172,19 @@
this.hadoopConf = hadoopConf;
}
- public void setExpectBlockIds(Roaring64NavigableMap expectBlockIds) {
+ public void setExpectBlockIds(BlockIdSet expectBlockIds) {
this.expectBlockIds = expectBlockIds;
}
- public Roaring64NavigableMap getExpectBlockIds() {
+ public BlockIdSet getExpectBlockIds() {
return expectBlockIds;
}
- public void setProcessBlockIds(Roaring64NavigableMap processBlockIds) {
+ public void setProcessBlockIds(BlockIdSet processBlockIds) {
this.processBlockIds = processBlockIds;
}
- public Roaring64NavigableMap getProcessBlockIds() {
+ public BlockIdSet getProcessBlockIds() {
return processBlockIds;
}
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandlerTest.java
index fa684b8..49277a1 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandlerTest.java
@@ -27,11 +27,11 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.storage.HadoopTestBase;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -52,7 +52,7 @@
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, "test", hadoopConf, writeUser);
Map<Long, byte[]> expectedData = Maps.newHashMap();
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectBlockIds = BlockIdSet.empty();
int readBufferSize = 13;
int total = 0;
@@ -65,7 +65,7 @@
writeTestData(writeHandler, num, 3, 0, expectedData);
total += calcExpectedSegmentNum(num, 3, readBufferSize);
expectTotalBlockNum += num;
- expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
+ expectedData.forEach((id, block) -> expectBlockIds.add(id));
}
/** This part is to check the fault tolerance of reading HDFS incomplete index file */
@@ -75,7 +75,7 @@
indexWriter.writeData(ByteBuffer.allocate(4).putInt(999).array());
indexWriter.close();
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet processBlockIds = BlockIdSet.empty();
HadoopShuffleReadHandler indexReader =
new HadoopShuffleReadHandler(
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopHandlerTest.java
index 94c5f3c..241cad3 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopHandlerTest.java
@@ -28,11 +28,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.storage.HadoopTestBase;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
@@ -97,10 +97,10 @@
List<byte[]> expectedData,
List<Long> expectedBlockId)
throws IllegalStateException {
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectBlockIds = BlockIdSet.empty();
+ BlockIdSet processBlockIds = BlockIdSet.empty();
for (long blockId : expectedBlockId) {
- expectBlockIds.addLong(blockId);
+ expectBlockIds.add(blockId);
}
// read directly and compare
HadoopClientReadHandler readHandler =
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
index c11fc27..af20021 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
@@ -31,12 +31,12 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.util.BlockIdLayout;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.storage.HadoopShuffleHandlerTestBase;
import org.apache.uniffle.storage.HadoopTestBase;
@@ -65,9 +65,9 @@
int total =
HadoopShuffleHandlerTestBase.calcExpectedSegmentNum(
expectTotalBlockNum, blockSize, readBufferSize);
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
- expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
+ BlockIdSet expectBlockIds = BlockIdSet.empty();
+ BlockIdSet processBlockIds = BlockIdSet.empty();
+ expectedData.forEach((id, block) -> expectBlockIds.add(id));
String fileNamePrefix =
ShuffleStorageUtils.getFullShuffleDataFolder(
basePath, ShuffleStorageUtils.getShuffleDataPathWithRange("appId", 0, 1, 1, 10))
@@ -129,9 +129,9 @@
int total =
HadoopShuffleHandlerTestBase.calcExpectedSegmentNum(
expectTotalBlockNum, blockSize, readBufferSize);
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
- expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
+ BlockIdSet expectBlockIds = BlockIdSet.empty();
+ BlockIdSet processBlockIds = BlockIdSet.empty();
+ expectedData.forEach((id, block) -> expectBlockIds.add(id));
String fileNamePrefix =
ShuffleStorageUtils.getFullShuffleDataFolder(
basePath, ShuffleStorageUtils.getShuffleDataPathWithRange("appId", 0, 1, 1, 10))
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
index 2a55ae4..b9fadc3 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileServerReadHandlerTest.java
@@ -28,7 +28,6 @@
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
@@ -38,6 +37,7 @@
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockIdSet;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -50,7 +50,7 @@
int blockSize = 7;
ByteBuffer byteBuffer = ByteBuffer.allocate(expectTotalBlockNum * 40);
- Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet expectBlockIds = BlockIdSet.empty();
// We simulate the generation of 4 block index files and 3 block data files to test
// LocalFileClientReadHandler
@@ -77,7 +77,7 @@
new HashSet<>());
byteBuffer.rewind();
- blocks.forEach(block -> expectBlockIds.addLong(block.getBlockId()));
+ blocks.forEach(block -> expectBlockIds.add(block.getBlockId()));
String appId = "app1";
int shuffleId = 1;
@@ -122,7 +122,7 @@
.when(mockShuffleServerClient)
.getShuffleData(Mockito.argThat(segment2Match));
- Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+ BlockIdSet processBlockIds = BlockIdSet.empty();
LocalFileClientReadHandler handler =
new LocalFileClientReadHandler(
appId,