[MINOR] test: Use sensible partition ids in ShuffleReadClientImplTest (#1545)
### What changes were proposed in this pull request?
Explicitly uses partition id `0` or `1` where sensible. Also adds a test that writes and reads two partitions.
### Why are the changes needed?
In `ShuffleReadClientImplTest`, partition id `0` is implicitly used though some tests use `HadoopShuffleWriteHandler` with partition id `1`, which is not realistic.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index eda8136..f240b12 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -81,9 +81,10 @@
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
- writeTestData(writeHandler, 2, 30, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler, 2, 30, 1, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
baseReadBuilder()
+ .partitionId(1)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
.taskIdBitmap(taskIdBitmap)
@@ -96,9 +97,7 @@
taskIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
readClient =
baseReadBuilder()
- .basePath(basePath)
- .blockIdBitmap(blockIdBitmap)
- .taskIdBitmap(taskIdBitmap)
+ .partitionId(1)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
.taskIdBitmap(taskIdBitmap)
@@ -126,11 +125,12 @@
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
- writeTestData(writeHandler1, 2, 30, 0, expectedData, blockIdBitmap);
- writeTestData(writeHandler2, 2, 30, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler1, 2, 30, 0, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler2, 2, 30, 0, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
baseReadBuilder()
+ .partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -153,8 +153,8 @@
Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
- writeTestData(writeHandler1, 2, 30, 0, expectedData, blockIdBitmap);
- writeTestData(writeHandler2, 2, 30, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler1, 2, 30, 0, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler2, 2, 30, 0, 0, expectedData, blockIdBitmap);
// duplicate file created, it should be used in product environment
String shuffleFolder = basePath + "/appId/0/0-1";
@@ -189,6 +189,7 @@
ShuffleReadClientImpl readClient =
baseReadBuilder()
+ .partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -209,10 +210,11 @@
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
- writeTestData(writeHandler, 2, 30, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
baseReadBuilder()
+ .partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -248,9 +250,10 @@
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
- writeTestData(writeHandler, 2, 30, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
baseReadBuilder()
+ .partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -275,12 +278,12 @@
Map<Long, byte[]> expectedData2 = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
- writeTestData(writeHandler, 10, 30, 0, expectedData1, blockIdBitmap1);
+ writeTestData(writeHandler, 10, 30, 0, 0, expectedData1, blockIdBitmap1);
Roaring64NavigableMap blockIdBitmap2 = Roaring64NavigableMap.bitmapOf();
- writeTestData(writeHandler, 10, 30, 0, expectedData2, blockIdBitmap2);
+ writeTestData(writeHandler, 10, 30, 0, 0, expectedData2, blockIdBitmap2);
- writeTestData(writeHandler, 10, 30, 0, expectedData1, blockIdBitmap1);
+ writeTestData(writeHandler, 10, 30, 0, 0, expectedData1, blockIdBitmap1);
ShuffleReadClientImpl readClient1 =
baseReadBuilder()
.partitionId(0)
@@ -292,6 +295,7 @@
final ShuffleReadClientImpl readClient2 =
baseReadBuilder()
+ .partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap2)
@@ -315,9 +319,10 @@
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
- writeTestData(writeHandler, 2, 30, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
baseReadBuilder()
+ .partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -325,6 +330,7 @@
.build();
ShuffleReadClientImpl readClient2 =
baseReadBuilder()
+ .partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -356,6 +362,7 @@
// empty data
ShuffleReadClientImpl readClient =
baseReadBuilder()
+ .partitionId(0)
.partitionNumPerRange(2)
.basePath("basePath")
.blockIdBitmap(Roaring64NavigableMap.bitmapOf())
@@ -374,7 +381,7 @@
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
- writeTestData(writeHandler, 5, 30, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler, 5, 30, 0, 0, expectedData, blockIdBitmap);
Roaring64NavigableMap wrongBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
LongIterator iter = blockIdBitmap.getLongIterator();
while (iter.hasNext()) {
@@ -409,10 +416,11 @@
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
- writeTestData(writeHandler, 10, 30, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler, 10, 30, 1, 0, expectedData, blockIdBitmap);
// test with different indexReadLimit to validate result
ShuffleReadClientImpl readClient =
baseReadBuilder()
+ .partitionId(1)
.indexReadLimit(1)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -424,6 +432,7 @@
readClient =
baseReadBuilder()
+ .partitionId(1)
.indexReadLimit(2)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -435,6 +444,7 @@
readClient =
baseReadBuilder()
+ .partitionId(1)
.indexReadLimit(3)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -446,6 +456,7 @@
readClient =
baseReadBuilder()
+ .partitionId(1)
.indexReadLimit(10)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -457,6 +468,7 @@
readClient =
baseReadBuilder()
+ .partitionId(1)
.indexReadLimit(11)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -476,13 +488,14 @@
Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 1);
- writeTestData(writeHandler, 5, 30, 0, expectedData, blockIdBitmap);
- writeTestData(writeHandler, 5, 30, 2, Maps.newHashMap(), blockIdBitmap);
- writeTestData(writeHandler, 5, 30, 1, expectedData, blockIdBitmap);
+ writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler, 5, 30, 1, 2, Maps.newHashMap(), blockIdBitmap);
+ writeTestData(writeHandler, 5, 30, 1, 1, expectedData, blockIdBitmap);
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
baseReadBuilder()
+ .partitionId(1)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
.taskIdBitmap(taskIdBitmap)
@@ -502,16 +515,17 @@
Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 3);
- writeTestData(writeHandler, 5, 30, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
// test case: data generated by speculation task without report result
- writeTestData(writeHandler, 5, 30, 1, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
+ writeTestData(writeHandler, 5, 30, 1, 1, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
// test case: data generated by speculation task with report result
- writeTestData(writeHandler, 5, 30, 2, Maps.newHashMap(), blockIdBitmap);
- writeTestData(writeHandler, 5, 30, 3, expectedData, blockIdBitmap);
+ writeTestData(writeHandler, 5, 30, 1, 2, Maps.newHashMap(), blockIdBitmap);
+ writeTestData(writeHandler, 5, 30, 1, 3, expectedData, blockIdBitmap);
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
baseReadBuilder()
+ .partitionId(1)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
.taskIdBitmap(taskIdBitmap)
@@ -531,13 +545,14 @@
Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0, 2);
- writeDuplicatedData(writeHandler, 5, 30, 0, expectedData, blockIdBitmap);
- writeTestData(writeHandler, 5, 30, 1, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
- writeTestData(writeHandler, 5, 30, 2, expectedData, blockIdBitmap);
+ writeDuplicatedData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler, 5, 30, 1, 1, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
+ writeTestData(writeHandler, 5, 30, 1, 2, expectedData, blockIdBitmap);
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
baseReadBuilder()
+ .partitionId(1)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
.taskIdBitmap(taskIdBitmap)
@@ -557,15 +572,16 @@
Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
- writeTestData(writeHandler, 5, 30, 0, expectedData, blockIdBitmap);
- writeTestData(writeHandler, 5, 30, 0, expectedData, blockIdBitmap);
- writeTestData(writeHandler, 5, 30, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
- writeTestData(writeHandler, 5, 30, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
- writeTestData(writeHandler, 5, 30, 0, expectedData, blockIdBitmap);
- writeTestData(writeHandler, 5, 30, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
+ writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
+ writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
+ writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap);
+ writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), Roaring64NavigableMap.bitmapOf());
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
baseReadBuilder()
+ .partitionId(1)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
.taskIdBitmap(taskIdBitmap)
@@ -576,10 +592,53 @@
readClient.close();
}
+ @Test
+ public void readTest16() throws Exception {
+ String basePath = HDFS_URI + "clientReadTest16";
+ HadoopShuffleWriteHandler writeHandler0 =
+ new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
+ HadoopShuffleWriteHandler writeHandler1 =
+ new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);
+
+ Map<Long, byte[]> expectedData0 = Maps.newHashMap();
+ Map<Long, byte[]> expectedData1 = Maps.newHashMap();
+ Roaring64NavigableMap blockIdBitmap0 = Roaring64NavigableMap.bitmapOf();
+ Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf();
+ writeTestData(writeHandler0, 2, 30, 0, 0, expectedData0, blockIdBitmap0);
+ writeTestData(writeHandler1, 2, 30, 1, 1, expectedData1, blockIdBitmap1);
+
+ ShuffleReadClientImpl readClient =
+ baseReadBuilder()
+ .partitionId(0)
+ .partitionNumPerRange(2)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap0)
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(0))
+ .shuffleServerInfoList(Lists.newArrayList(ssi1, ssi2))
+ .build();
+ TestUtils.validateResult(readClient, expectedData0);
+ readClient.checkProcessedBlockIds();
+ readClient.close();
+
+ readClient =
+ baseReadBuilder()
+ .partitionId(1)
+ .partitionNumPerRange(2)
+ .basePath(basePath)
+ .blockIdBitmap(blockIdBitmap1)
+ .taskIdBitmap(Roaring64NavigableMap.bitmapOf(1))
+ .shuffleServerInfoList(Lists.newArrayList(ssi1, ssi2))
+ .build();
+ TestUtils.validateResult(readClient, expectedData1);
+ readClient.checkProcessedBlockIds();
+ readClient.close();
+ }
+
private void writeTestData(
HadoopShuffleWriteHandler writeHandler,
int num,
int length,
+ int partitionId,
long taskAttemptId,
Map<Long, byte[]> expectedData,
Roaring64NavigableMap blockIdBitmap)
@@ -588,7 +647,7 @@
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
- long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0, taskAttemptId);
+ long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), partitionId, taskAttemptId);
blocks.add(
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
@@ -602,6 +661,7 @@
HadoopShuffleWriteHandler writeHandler,
int num,
int length,
+ int partitionId,
long taskAttemptId,
Map<Long, byte[]> expectedData,
Roaring64NavigableMap blockIdBitmap)
@@ -610,7 +670,7 @@
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
- long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0, taskAttemptId);
+ long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), partitionId, taskAttemptId);
ShufflePartitionedBlock spb =
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf);