[MINOR] refactor(common): Move blockId bit logic into common class (#1527)
### What changes were proposed in this pull request?
Moves block id bit manipulation logic into one place in `common.util`. Logic is tested once and reused everywhere. Reused in `RssTezUtils` and `RssMRUtils` where reasonable. Aligns the order of arguments that constructs a `BlockId` with the bit-wise order inside the block id (highest to lowest).
### Why are the changes needed?
Reduces code duplication and improves code readability.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 5be31d3..e92bf07 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -37,32 +37,34 @@
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.Constants;
public class RssMRUtils {
private static final Logger LOG = LoggerFactory.getLogger(RssMRUtils.class);
private static final int MAX_ATTEMPT_LENGTH = 6;
- private static final long MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+ private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+ private static final int MAX_SEQUENCE_NO =
+ (1 << (Constants.ATOMIC_INT_MAX_LENGTH - MAX_ATTEMPT_LENGTH)) - 1;
// Class TaskAttemptId have two field id and mapId, rss taskAttemptID have 21 bits,
// mapId is 19 bits, id is 2 bits. MR have a trick logic, taskAttemptId will increase
// 1000 * (appAttemptId - 1), so we will decrease it.
public static long convertTaskAttemptIdToLong(TaskAttemptID taskAttemptID, int appAttemptId) {
- long lowBytes = taskAttemptID.getTaskID().getId();
+ int lowBytes = taskAttemptID.getTaskID().getId();
if (lowBytes > Constants.MAX_TASK_ATTEMPT_ID) {
throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed");
}
if (appAttemptId < 1) {
throw new RssException("appAttemptId " + appAttemptId + " is wrong");
}
- long highBytes = (long) taskAttemptID.getId() - (appAttemptId - 1) * 1000;
+ int highBytes = taskAttemptID.getId() - (appAttemptId - 1) * 1000;
if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
throw new RssException(
"TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " exceed");
}
- return (highBytes << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
- + lowBytes;
+ return BlockId.getBlockId(highBytes, 0, lowBytes);
}
public static TaskAttemptID createMRTaskAttemptId(
@@ -70,14 +72,9 @@
if (appAttemptId < 1) {
throw new RssException("appAttemptId " + appAttemptId + " is wrong");
}
- TaskID taskID =
- new TaskID(jobID, taskType, (int) (rssTaskAttemptId & Constants.MAX_TASK_ATTEMPT_ID));
- return new TaskAttemptID(
- taskID,
- (int)
- (rssTaskAttemptId
- >> (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
- + 1000 * (appAttemptId - 1));
+ TaskID taskID = new TaskID(jobID, taskType, BlockId.getTaskAttemptId(rssTaskAttemptId));
+ int id = BlockId.getSequenceNo(rssTaskAttemptId) + 1000 * (appAttemptId - 1);
+ return new TaskAttemptID(taskID, id);
}
public static ShuffleWriteClient createShuffleClient(JobConf jobConf) {
@@ -229,51 +226,31 @@
return rssJobConf.get(key, defaultValue);
}
- public static long getBlockId(long partitionId, long taskAttemptId, int nextSeqNo) {
+ public static long getBlockId(int partitionId, long taskAttemptId, int nextSeqNo) {
long attemptId =
taskAttemptId >> (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
throw new RssException(
"Can't support attemptId [" + attemptId + "], the max value should be " + MAX_ATTEMPT_ID);
}
- long atomicInt = (nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId;
- if (atomicInt < 0 || atomicInt > Constants.MAX_SEQUENCE_NO) {
+ if (nextSeqNo < 0 || nextSeqNo > MAX_SEQUENCE_NO) {
throw new RssException(
- "Can't support sequence ["
- + atomicInt
- + "], the max value should be "
- + Constants.MAX_SEQUENCE_NO);
+ "Can't support sequence [" + nextSeqNo + "], the max value should be " + MAX_SEQUENCE_NO);
}
- if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
- throw new RssException(
- "Can't support partitionId["
- + partitionId
- + "], the max value should be "
- + Constants.MAX_PARTITION_ID);
- }
+
+ int atomicInt = (int) ((nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId);
long taskId =
taskAttemptId
- (attemptId
<< (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
- if (taskId < 0 || taskId > Constants.MAX_TASK_ATTEMPT_ID) {
- throw new RssException(
- "Can't support taskId["
- + taskId
- + "], the max value should be "
- + Constants.MAX_TASK_ATTEMPT_ID);
- }
- return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
- + taskId;
+
+ return BlockId.getBlockId(atomicInt, partitionId, taskId);
}
public static long getTaskAttemptId(long blockId) {
- long mapId = blockId & Constants.MAX_TASK_ATTEMPT_ID;
- long attemptId =
- (blockId >> (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
- & MAX_ATTEMPT_ID;
- return (attemptId << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
- + mapId;
+ int mapId = BlockId.getTaskAttemptId(blockId);
+ int attemptId = BlockId.getSequenceNo(blockId) & MAX_ATTEMPT_ID;
+ return BlockId.getBlockId(attemptId, 0, mapId);
}
public static int estimateTaskConcurrency(JobConf jobConf) {
diff --git a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
index cb5c2c6..c95951e 100644
--- a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
+++ b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
@@ -87,7 +87,7 @@
long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
for (int partitionId = 0; partitionId <= 3000; partitionId++) {
for (int seqNo = 0; seqNo <= 10; seqNo++) {
- long blockId = RssMRUtils.getBlockId(Long.valueOf(partitionId), taskAttemptId, seqNo);
+ long blockId = RssMRUtils.getBlockId(partitionId, taskAttemptId, seqNo);
int newPartitionId =
Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
assertEquals(partitionId, newPartitionId);
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 5b9a928..9450c0f 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -46,12 +46,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
public class WriteBufferManager extends MemoryConsumer {
@@ -325,8 +325,7 @@
compressTime += System.currentTimeMillis() - start;
}
final long crc32 = ChecksumUtils.getCrc32(compressed);
- final long blockId =
- ClientUtils.getBlockId(partitionId, taskAttemptId, getNextSeqNo(partitionId));
+ final long blockId = BlockId.getBlockId(getNextSeqNo(partitionId), partitionId, taskAttemptId);
uncompressedDataLen += data.length;
shuffleWriteMetrics.incBytesWritten(compressed.length);
// add memory to indicate bytes which will be sent to shuffle server
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
index 0724547..90505d9 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
@@ -34,10 +34,10 @@
import org.apache.spark.serializer.SerializerInstance;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
-import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.storage.HadoopTestBase;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -106,7 +106,7 @@
expectedData.put(key, value);
writeData(serializeStream, key, value);
}
- long blockId = ClientUtils.getBlockId(partitionID, 0, atomicInteger.getAndIncrement());
+ long blockId = BlockId.getBlockId(atomicInteger.getAndIncrement(), partitionID, 0);
blockIdBitmap.add(blockId);
blocks.add(createShuffleBlock(output.toBytes(), blockId, compress));
serializeStream.close();
diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index 0dea951..2962bc1 100644
--- a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -40,9 +40,9 @@
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
-import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
@@ -81,7 +81,7 @@
validateResult(rssShuffleDataIterator, expectedData, 10);
- blockIdBitmap.add(ClientUtils.getBlockId(0, 0, Constants.MAX_SEQUENCE_NO));
+ blockIdBitmap.add(BlockId.getBlockId(Constants.MAX_SEQUENCE_NO, 0, 0));
rssShuffleDataIterator =
getDataIterator(basePath, blockIdBitmap, taskIdBitmap, Lists.newArrayList(ssi1));
int recNum = 0;
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
index e497995..c1e8643 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
@@ -53,6 +53,7 @@
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.Constants;
public class RssTezUtils {
@@ -60,7 +61,9 @@
private static final Logger LOG = LoggerFactory.getLogger(RssTezUtils.class);
private static final int MAX_ATTEMPT_LENGTH = 6;
- private static final long MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+ private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+ private static final int MAX_SEQUENCE_NO =
+ (1 << (Constants.ATOMIC_INT_MAX_LENGTH - MAX_ATTEMPT_LENGTH)) - 1;
public static final String HOST_NAME = "hostname";
@@ -155,7 +158,7 @@
return StringUtils.join(ids, "_", 0, 7);
}
- public static long getBlockId(long partitionId, long taskAttemptId, int nextSeqNo) {
+ public static long getBlockId(int partitionId, long taskAttemptId, int nextSeqNo) {
LOG.info(
"GetBlockId, partitionId:{}, taskAttemptId:{}, nextSeqNo:{}",
partitionId,
@@ -167,45 +170,24 @@
throw new RssException(
"Can't support attemptId [" + attemptId + "], the max value should be " + MAX_ATTEMPT_ID);
}
- long atomicInt = (nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId;
- if (atomicInt < 0 || atomicInt > Constants.MAX_SEQUENCE_NO) {
+ if (nextSeqNo < 0 || nextSeqNo > MAX_SEQUENCE_NO) {
throw new RssException(
- "Can't support sequence ["
- + atomicInt
- + "], the max value should be "
- + Constants.MAX_SEQUENCE_NO);
+ "Can't support sequence [" + nextSeqNo + "], the max value should be " + MAX_SEQUENCE_NO);
}
- if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
- throw new RssException(
- "Can't support partitionId["
- + partitionId
- + "], the max value should be "
- + Constants.MAX_PARTITION_ID);
- }
+
+ int atomicInt = (int) ((nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId);
long taskId =
taskAttemptId
- (attemptId
<< (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
- if (taskId < 0 || taskId > Constants.MAX_TASK_ATTEMPT_ID) {
- throw new RssException(
- "Can't support taskId["
- + taskId
- + "], the max value should be "
- + Constants.MAX_TASK_ATTEMPT_ID);
- }
- return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
- + taskId;
+ return BlockId.getBlockId(atomicInt, partitionId, taskId);
}
public static long getTaskAttemptId(long blockId) {
- long mapId = blockId & Constants.MAX_TASK_ATTEMPT_ID;
- long attemptId =
- (blockId >> (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
- & MAX_ATTEMPT_ID;
- return (attemptId << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
- + mapId;
+ int mapId = BlockId.getTaskAttemptId(blockId);
+ int attemptId = BlockId.getSequenceNo(blockId) & MAX_ATTEMPT_ID;
+ return BlockId.getBlockId(attemptId, 0, mapId);
}
public static int estimateTaskConcurrency(Configuration jobConf, int mapNum, int reduceNum) {
@@ -298,18 +280,16 @@
}
public static long convertTaskAttemptIdToLong(TezTaskAttemptID taskAttemptID) {
- long lowBytes = taskAttemptID.getTaskID().getId();
+ int lowBytes = taskAttemptID.getTaskID().getId();
if (lowBytes > Constants.MAX_TASK_ATTEMPT_ID) {
throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed");
}
- long highBytes = taskAttemptID.getId();
+ int highBytes = taskAttemptID.getId();
if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
throw new RssException(
"TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " exceed.");
}
- long id =
- (highBytes << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
- + lowBytes;
+ long id = BlockId.getBlockId(highBytes, 0, lowBytes);
LOG.info("ConvertTaskAttemptIdToLong taskAttemptID:{}, id is {}, .", taskAttemptID, id);
return id;
}
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index ec83a09..3892458 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -53,6 +53,7 @@
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -371,8 +372,8 @@
final long crc32 = ChecksumUtils.getCrc32(compressed);
compressTime += System.currentTimeMillis() - start;
final long blockId =
- RssTezUtils.getBlockId((long) partitionId, taskAttemptId, getNextSeqNo(partitionId));
- LOG.info("blockId is {}", blockId);
+ RssTezUtils.getBlockId(partitionId, taskAttemptId, getNextSeqNo(partitionId));
+ LOG.info("blockId is {}", BlockId.fromLong(blockId));
uncompressedDataLen += data.length;
// add memory to indicate bytes which will be sent to shuffle server
inSendListBytes.addAndGet(wb.getDataLength());
diff --git a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
index c21173d..639521f 100644
--- a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
+++ b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
@@ -99,7 +99,7 @@
long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
for (int partitionId = 0; partitionId <= 3000; partitionId++) {
for (int seqNo = 0; seqNo <= 10; seqNo++) {
- long blockId = RssTezUtils.getBlockId(Long.valueOf(partitionId), taskAttemptId, seqNo);
+ long blockId = RssTezUtils.getBlockId(partitionId, taskAttemptId, seqNo);
int newPartitionId =
Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
assertEquals(partitionId, newPartitionId);
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 7e2f930..e1fbb9f 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -41,6 +41,7 @@
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssFetchFailedException;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.IdHelper;
import org.apache.uniffle.common.util.RssUtils;
@@ -211,14 +212,14 @@
actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(), bs.getLength());
crcCheckTime.addAndGet(System.currentTimeMillis() - start);
} catch (Exception e) {
- LOG.warn("Can't read data for blockId[" + bs.getBlockId() + "]", e);
+ LOG.warn("Can't read data for " + BlockId.toString(bs.getBlockId()), e);
}
if (expectedCrc != actualCrc) {
String errMsg =
- "Unexpected crc value for blockId["
- + bs.getBlockId()
- + "], expected:"
+ "Unexpected crc value for "
+ + BlockId.toString(bs.getBlockId())
+ + ", expected:"
+ expectedCrc
+ ", actual:"
+ actualCrc;
diff --git a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
index b3330c8..b3d40dc 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
@@ -29,43 +29,10 @@
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.RemoteStorageInfo;
-import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.util.StorageType;
public class ClientUtils {
- // BlockId is positive long (63 bits) composed of partitionId, taskAttemptId and AtomicInteger.
- // AtomicInteger is highest 18 bits, max value is 2^18 - 1
- // partitionId is middle 24 bits, max value is 2^24 - 1
- // taskAttemptId is lowest 21 bits, max value is 2^21 - 1
- // Values of partitionId, taskAttemptId and AtomicInteger are always positive.
- public static Long getBlockId(long partitionId, long taskAttemptId, long atomicInt) {
- if (atomicInt < 0 || atomicInt > Constants.MAX_SEQUENCE_NO) {
- throw new IllegalArgumentException(
- "Can't support sequence["
- + atomicInt
- + "], the max value should be "
- + Constants.MAX_SEQUENCE_NO);
- }
- if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
- throw new IllegalArgumentException(
- "Can't support partitionId["
- + partitionId
- + "], the max value should be "
- + Constants.MAX_PARTITION_ID);
- }
- if (taskAttemptId < 0 || taskAttemptId > Constants.MAX_TASK_ATTEMPT_ID) {
- throw new IllegalArgumentException(
- "Can't support taskAttemptId["
- + taskAttemptId
- + "], the max value should be "
- + Constants.MAX_TASK_ATTEMPT_ID);
- }
- return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
- + taskAttemptId;
- }
-
public static RemoteStorageInfo fetchRemoteStorage(
String appId,
RemoteStorageInfo defaultRemoteStorage,
diff --git a/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java b/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java
index 97376cc..7161c4f 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java
@@ -17,12 +17,12 @@
package org.apache.uniffle.client.util;
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.IdHelper;
public class DefaultIdHelper implements IdHelper {
@Override
public long getTaskAttemptId(long blockId) {
- return blockId & Constants.MAX_TASK_ATTEMPT_ID;
+ return BlockId.getTaskAttemptId(blockId);
}
}
diff --git a/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java b/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
index b8e7e7f..19ff6ac 100644
--- a/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
@@ -33,12 +33,11 @@
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.client.util.DefaultIdHelper;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.RssUtils;
import static org.apache.uniffle.client.util.ClientUtils.waitUntilDoneOrFail;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class ClientUtilsTest {
@@ -47,33 +46,6 @@
private ExecutorService executorService = Executors.newFixedThreadPool(10);
@Test
- public void getBlockIdTest() {
- // max value of blockId
- assertEquals(new Long(854558029292503039L), ClientUtils.getBlockId(16777215, 1048575, 24287));
- // just a random test
- assertEquals(new Long(3518437418598500L), ClientUtils.getBlockId(100, 100, 100));
- // min value of blockId
- assertEquals(new Long(0L), ClientUtils.getBlockId(0, 0, 0));
-
- final Throwable e1 =
- assertThrows(IllegalArgumentException.class, () -> ClientUtils.getBlockId(16777216, 0, 0));
- assertTrue(
- e1.getMessage()
- .contains("Can't support partitionId[16777216], the max value should be 16777215"));
-
- final Throwable e2 =
- assertThrows(IllegalArgumentException.class, () -> ClientUtils.getBlockId(0, 2097152, 0));
- assertTrue(
- e2.getMessage()
- .contains("Can't support taskAttemptId[2097152], the max value should be 2097151"));
-
- final Throwable e3 =
- assertThrows(IllegalArgumentException.class, () -> ClientUtils.getBlockId(0, 0, 262144));
- assertTrue(
- e3.getMessage().contains("Can't support sequence[262144], the max value should be 262143"));
- }
-
- @Test
public void testGenerateTaskIdBitMap() {
int partitionId = 1;
Roaring64NavigableMap blockIdMap = Roaring64NavigableMap.bitmapOf();
@@ -82,7 +54,7 @@
for (int i = 0; i < taskSize; i++) {
except[i] = i;
for (int j = 0; j < 100; j++) {
- Long blockId = ClientUtils.getBlockId(partitionId, i, j);
+ long blockId = BlockId.getBlockId(j, partitionId, i);
blockIdMap.addLong(blockId);
}
}
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index 14e9af9..eda8136 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -21,7 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -38,6 +38,7 @@
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.HadoopTestBase;
@@ -53,7 +54,7 @@
public class ShuffleReadClientImplTest extends HadoopTestBase {
private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should be thrown";
- private static AtomicLong ATOMIC_LONG = new AtomicLong(0);
+ private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
private ShuffleServerInfo ssi1 = new ShuffleServerInfo("host1-0", "host1", 0);
private ShuffleServerInfo ssi2 = new ShuffleServerInfo("host2-0", "host2", 0);
@@ -91,7 +92,7 @@
readClient.checkProcessedBlockIds();
readClient.close();
- blockIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
+ blockIdBitmap.addLong(BlockId.getBlockId(0, 0, Constants.MAX_TASK_ATTEMPT_ID - 1));
taskIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
readClient =
baseReadBuilder()
@@ -377,7 +378,9 @@
Roaring64NavigableMap wrongBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
LongIterator iter = blockIdBitmap.getLongIterator();
while (iter.hasNext()) {
- wrongBlockIdBitmap.addLong(iter.next() + (1 << Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+ BlockId blockId = BlockId.fromLong(iter.next());
+ wrongBlockIdBitmap.addLong(
+ BlockId.getBlockId(blockId.sequenceNo, blockId.partitionId + 1, blockId.taskAttemptId));
}
ShuffleReadClientImpl readClient =
@@ -585,10 +588,7 @@
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
- long blockId =
- (ATOMIC_LONG.getAndIncrement()
- << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + taskAttemptId;
+ long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0, taskAttemptId);
blocks.add(
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
@@ -610,10 +610,7 @@
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
- long blockId =
- (ATOMIC_LONG.getAndIncrement()
- << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + taskAttemptId;
+ long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0, taskAttemptId);
ShufflePartitionedBlock spb =
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf);
diff --git a/common/src/main/java/org/apache/uniffle/common/BufferSegment.java b/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
index 1ab68c5..ac7faa4 100644
--- a/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
+++ b/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
@@ -20,6 +20,7 @@
import java.util.Objects;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
public class BufferSegment {
@@ -60,9 +61,9 @@
@Override
public String toString() {
- return "BufferSegment{blockId["
- + blockId
- + "], taskAttemptId["
+ return "BufferSegment{"
+ + BlockId.toString(blockId)
+ + ", taskAttemptId["
+ taskAttemptId
+ "], offset["
+ offset
diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java b/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
index 8de75d9..8b5fdc6 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
@@ -22,6 +22,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ByteBufUtils;
public class ShuffleBlockInfo {
@@ -136,7 +137,7 @@
sb.append("ShuffleBlockInfo:");
sb.append("shuffleId[" + shuffleId + "],");
sb.append("partitionId[" + partitionId + "],");
- sb.append("blockId[" + blockId + "],");
+ sb.append(BlockId.toString(blockId) + ",");
sb.append("length[" + length + "],");
sb.append("uncompressLength[" + uncompressLength + "],");
sb.append("crc[" + crc + "],");
diff --git a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
index 1ce68b6..4772b50 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
@@ -22,6 +22,8 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import org.apache.uniffle.common.util.BlockId;
+
public class ShufflePartitionedBlock {
private int length;
@@ -123,9 +125,9 @@
@Override
public String toString() {
- return "ShufflePartitionedBlock{blockId["
- + blockId
- + "], length["
+ return "ShufflePartitionedBlock{"
+ + BlockId.toString(blockId)
+ + ", length["
+ length
+ "], uncompressLength["
+ uncompressLength
diff --git a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
index 7b3626d..898d0b1 100644
--- a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
+++ b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
@@ -29,7 +29,7 @@
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockId;
public class FixedSizeSegmentSplitter implements SegmentSplitter {
private static final Logger LOGGER = LoggerFactory.getLogger(FixedSizeSegmentSplitter.class);
@@ -82,14 +82,13 @@
// than the length in the actual data file, and it needs to be returned at this time to
// avoid EOFException
if (dataFileLen != -1 && totalLength > dataFileLen) {
- long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
LOGGER.info(
"Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
+ "the real data file length: {}(bytes). Partition id: {}. "
+ "This may happen when the data is flushing, please ignore.",
totalLength,
dataFileLen,
- Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask));
+ BlockId.getPartitionId(blockId));
break;
}
diff --git a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
index 98b9cca..cbf0979 100644
--- a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
+++ b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
@@ -31,7 +31,7 @@
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockId;
/**
* {@class LocalOrderSegmentSplitter} will be initialized only when the {@class
@@ -104,14 +104,13 @@
// than the length in the actual data file, and it needs to be returned at this time to
// avoid EOFException
if (dataFileLen != -1 && totalLen > dataFileLen) {
- long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
LOGGER.info(
"Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
+ "the real data file length: {}(bytes). Partition id: {}. This should not happen. "
+ "This may happen when the data is flushing, please ignore.",
totalLen,
dataFileLen,
- Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask));
+ BlockId.getPartitionId(blockId));
break;
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/BlockId.java b/common/src/main/java/org/apache/uniffle/common/util/BlockId.java
new file mode 100644
index 0000000..7aecaae
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/BlockId.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+// BlockId is positive long (63 bits) composed of partitionId, taskAttemptId and AtomicInteger.
+// AtomicInteger is highest 18 bits, max value is 2^18 - 1
+// partitionId is middle 24 bits, max value is 2^24 - 1
+// taskAttemptId is lowest 21 bits, max value is 2^21 - 1
+// Values of partitionId, taskAttemptId and AtomicInteger are always positive.
+public class BlockId {
+ public final long blockId;
+ public final int sequenceNo;
+ public final int partitionId;
+ public final int taskAttemptId;
+
+ private BlockId(long blockId, int sequenceNo, int partitionId, int taskAttemptId) {
+ this.blockId = blockId;
+ this.sequenceNo = sequenceNo;
+ this.partitionId = partitionId;
+ this.taskAttemptId = taskAttemptId;
+ }
+
+ @Override
+ public String toString() {
+ return "blockId["
+ + Long.toHexString(blockId)
+ + " (seq: "
+ + sequenceNo
+ + ", part: "
+ + partitionId
+ + ", task: "
+ + taskAttemptId
+ + ")]";
+ }
+
+ public static String toString(long blockId) {
+ return BlockId.fromLong(blockId).toString();
+ }
+
+ public static BlockId fromLong(long blockId) {
+ int sequenceNo = getSequenceNo(blockId);
+ int partitionId = getPartitionId(blockId);
+ int taskAttemptId = getTaskAttemptId(blockId);
+ return new BlockId(blockId, sequenceNo, partitionId, taskAttemptId);
+ }
+
+ public static BlockId fromIds(int sequenceNo, int partitionId, int taskAttemptId) {
+ long blockId = getBlockId(sequenceNo, partitionId, taskAttemptId);
+ return new BlockId(blockId, sequenceNo, partitionId, taskAttemptId);
+ }
+
+ public static long getBlockId(int sequenceNo, int partitionId, long taskAttemptId) {
+ if (sequenceNo < 0 || sequenceNo > Constants.MAX_SEQUENCE_NO) {
+ throw new IllegalArgumentException(
+ "Can't support sequence["
+ + sequenceNo
+ + "], the max value should be "
+ + Constants.MAX_SEQUENCE_NO);
+ }
+ if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
+ throw new IllegalArgumentException(
+ "Can't support partitionId["
+ + partitionId
+ + "], the max value should be "
+ + Constants.MAX_PARTITION_ID);
+ }
+ if (taskAttemptId < 0 || taskAttemptId > Constants.MAX_TASK_ATTEMPT_ID) {
+ throw new IllegalArgumentException(
+ "Can't support taskAttemptId["
+ + taskAttemptId
+ + "], the max value should be "
+ + Constants.MAX_TASK_ATTEMPT_ID);
+ }
+
+ return ((long) sequenceNo
+ << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
+ + ((long) partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
+ + taskAttemptId;
+ }
+
+ public static int getSequenceNo(long blockId) {
+ return (int)
+ (blockId >> (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+ }
+
+ public static int getPartitionId(long blockId) {
+ return (int) ((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & Constants.MAX_PARTITION_ID);
+ }
+
+ public static int getTaskAttemptId(long blockId) {
+ return (int) (blockId & Constants.MAX_TASK_ATTEMPT_ID);
+ }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 4b35463..a361eab 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -29,14 +29,17 @@
public static final String SHUFFLE_INDEX_FILE_SUFFIX = ".index";
// BlockId is long and consist of partitionId, taskAttemptId, atomicInt
// the length of them are ATOMIC_INT_MAX_LENGTH + PARTITION_ID_MAX_LENGTH +
- // TASK_ATTEMPT_ID_MAX_LENGTH = 63
+ // TASK_ATTEMPT_ID_MAX_LENGTH = 63, each of these lengths must be less than 32
+ // do not access the fields, no need to implement your own bit manipulation logic,
+ // better use methods provided by org.apache.uniffle.common.util.BlockId
public static final int PARTITION_ID_MAX_LENGTH = 24;
public static final int TASK_ATTEMPT_ID_MAX_LENGTH = 21;
public static final int ATOMIC_INT_MAX_LENGTH = 18;
- public static final long MAX_SEQUENCE_NO = (1 << Constants.ATOMIC_INT_MAX_LENGTH) - 1;
- public static final long MAX_PARTITION_ID = (1 << Constants.PARTITION_ID_MAX_LENGTH) - 1;
- public static final long MAX_TASK_ATTEMPT_ID = (1 << Constants.TASK_ATTEMPT_ID_MAX_LENGTH) - 1;
+ public static final int MAX_SEQUENCE_NO = (1 << Constants.ATOMIC_INT_MAX_LENGTH) - 1;
+ public static final int MAX_PARTITION_ID = (1 << Constants.PARTITION_ID_MAX_LENGTH) - 1;
+ public static final int MAX_TASK_ATTEMPT_ID = (1 << Constants.TASK_ATTEMPT_ID_MAX_LENGTH) - 1;
public static final long INVALID_BLOCK_ID = -1L;
+
public static final String KEY_SPLIT_CHAR = "/";
public static final String COMMA_SPLIT_CHAR = ",";
public static final String EQUAL_SPLIT_CHAR = "=";
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index ad2b4c4..6d4e230 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -335,10 +335,9 @@
result.computeIfAbsent(partitionId, key -> Roaring64NavigableMap.bitmapOf());
}
Iterator<Long> it = shuffleBitmap.iterator();
- long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
while (it.hasNext()) {
Long blockId = it.next();
- int partitionId = Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
+ int partitionId = BlockId.getPartitionId(blockId);
if (partitionId >= startPartition && partitionId < endPartition) {
result.get(partitionId).add(blockId);
}
diff --git a/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java b/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java
index ca10485..49a1c58 100644
--- a/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java
@@ -58,7 +58,7 @@
public void testToString() {
BufferSegment segment = new BufferSegment(0, 1, 2, 3, 4, 5);
assertEquals(
- "BufferSegment{blockId[0], taskAttemptId[5], offset[1], length[2], crc[4], uncompressLength[3]}",
+ "BufferSegment{blockId[0 (seq: 0, part: 0, task: 0)], taskAttemptId[5], offset[1], length[2], crc[4], uncompressLength[3]}",
segment.toString());
}
diff --git a/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java b/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java
index 71db702..fdac0e9 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java
@@ -22,6 +22,8 @@
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.common.util.BlockId;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ShuffleBlockInfoTest {
@@ -37,9 +39,9 @@
+ info.getShuffleId()
+ "],partitionId["
+ info.getPartitionId()
- + "],blockId["
- + info.getBlockId()
- + "],length["
+ + "],"
+ + BlockId.toString(info.getBlockId())
+ + ",length["
+ info.getLength()
+ "],uncompressLength["
+ info.getUncompressLength()
@@ -54,9 +56,9 @@
+ info2.getShuffleId()
+ "],partitionId["
+ info2.getPartitionId()
- + "],blockId["
- + info2.getBlockId()
- + "],length["
+ + "],"
+ + BlockId.toString(info2.getBlockId())
+ + ",length["
+ info2.getLength()
+ "],uncompressLength["
+ info2.getUncompressLength()
diff --git a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
index 3f894ab..7f402e5 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
@@ -23,6 +23,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ByteBufUtils;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -70,9 +71,9 @@
public void testToString() {
ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 2, 3, 4, 5, new byte[6]);
assertEquals(
- "ShufflePartitionedBlock{blockId["
- + b1.getBlockId()
- + "], length["
+ "ShufflePartitionedBlock{"
+ + BlockId.toString(b1.getBlockId())
+ + ", length["
+ b1.getLength()
+ "], uncompressLength["
+ b1.getUncompressLength()
diff --git a/common/src/test/java/org/apache/uniffle/common/util/BlockIdTest.java b/common/src/test/java/org/apache/uniffle/common/util/BlockIdTest.java
new file mode 100644
index 0000000..e42717d
--- /dev/null
+++ b/common/src/test/java/org/apache/uniffle/common/util/BlockIdTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class BlockIdTest {
+ @Test
+ public void test() {
+ // max value of blockId
+ assertEquals(854558029292503039L, BlockId.getBlockId(24287, 16777215, 1048575));
+ // just a random test
+ assertEquals(3518437418598500L, BlockId.getBlockId(100, 100, 100));
+ // min value of blockId
+ assertEquals(0L, BlockId.getBlockId(0, 0, 0));
+
+ BlockId blockId = BlockId.fromIds(100, 100, 100);
+ assertEquals("blockId[c80000c800064 (seq: 100, part: 100, task: 100)]", blockId.toString());
+
+ final Throwable e1 =
+ assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(262144, 0, 0));
+ assertEquals("Can't support sequence[262144], the max value should be 262143", e1.getMessage());
+
+ final Throwable e2 =
+ assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(0, 16777216, 0));
+ assertEquals(
+ "Can't support partitionId[16777216], the max value should be 16777215", e2.getMessage());
+
+ final Throwable e3 =
+ assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(0, 0, 2097152));
+ assertEquals(
+ "Can't support taskAttemptId[2097152], the max value should be 2097151", e3.getMessage());
+
+ final Throwable e4 =
+ assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(-1, 0, 0));
+ assertEquals("Can't support sequence[-1], the max value should be 262143", e4.getMessage());
+
+ final Throwable e5 =
+ assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(0, -1, 0));
+ assertEquals(
+ "Can't support partitionId[-1], the max value should be 16777215", e5.getMessage());
+
+ final Throwable e6 =
+ assertThrows(IllegalArgumentException.class, () -> BlockId.getBlockId(0, 0, -1));
+ assertEquals(
+ "Can't support taskAttemptId[-1], the max value should be 2097151", e6.getMessage());
+ }
+}
diff --git a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index 0e70486..d8457ae 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -229,10 +229,16 @@
public void testShuffleBitmapToPartitionBitmap() {
Roaring64NavigableMap partition1Bitmap =
Roaring64NavigableMap.bitmapOf(
- getBlockId(0, 0, 0), getBlockId(0, 0, 1), getBlockId(0, 1, 0), getBlockId(0, 1, 1));
+ BlockId.getBlockId(0, 0, 0),
+ BlockId.getBlockId(1, 0, 0),
+ BlockId.getBlockId(0, 0, 1),
+ BlockId.getBlockId(1, 0, 1));
Roaring64NavigableMap partition2Bitmap =
Roaring64NavigableMap.bitmapOf(
- getBlockId(1, 0, 0), getBlockId(1, 0, 1), getBlockId(1, 1, 0), getBlockId(1, 1, 1));
+ BlockId.getBlockId(0, 1, 0),
+ BlockId.getBlockId(1, 1, 0),
+ BlockId.getBlockId(0, 1, 1),
+ BlockId.getBlockId(1, 1, 1));
Roaring64NavigableMap shuffleBitmap = Roaring64NavigableMap.bitmapOf();
shuffleBitmap.or(partition1Bitmap);
shuffleBitmap.or(partition2Bitmap);
@@ -291,13 +297,6 @@
});
}
- // Copy from ClientUtils
- private Long getBlockId(long partitionId, long taskAttemptId, long atomicInt) {
- return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
- + taskAttemptId;
- }
-
interface RssUtilTestDummy {
String get();
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
index dc43ca1..c21808c 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
@@ -22,7 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -40,12 +40,12 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.segment.FixedSizeSegmentSplitter;
import org.apache.uniffle.common.segment.SegmentSplitter;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.Constants;
public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
- private static AtomicLong ATOMIC_LONG = new AtomicLong(0L);
+ private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
public static List<ShuffleServerInfo> mockSSI =
Lists.newArrayList(new ShuffleServerInfo("id", "host", 0));
@@ -62,11 +62,9 @@
for (int i = 0; i < blockNum; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
- long seqno = ATOMIC_LONG.getAndIncrement();
+ int seqno = ATOMIC_INT.getAndIncrement();
- long blockId =
- (seqno << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + taskAttemptId;
+ long blockId = BlockId.getBlockId(seqno, 0, taskAttemptId);
blockIdBitmap.addLong(blockId);
dataMap.put(blockId, buf);
shuffleBlockInfoList.add(
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index a1d662c..64d7739 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -56,7 +56,6 @@
import org.apache.uniffle.client.response.RssRegisterShuffleResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
-import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
@@ -65,6 +64,7 @@
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.metrics.TestUtils;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.proto.RssProtos;
@@ -338,17 +338,17 @@
assertEquals(expectedP3, blockIdBitmap);
partitionToBlockIds = Maps.newHashMap();
- blockIds1 = getBlockIdList((int) Constants.MAX_PARTITION_ID, 3);
+ blockIds1 = getBlockIdList(Constants.MAX_PARTITION_ID, 3);
blockIds2 = getBlockIdList(2, 2);
blockIds3 = getBlockIdList(3, 1);
- partitionToBlockIds.put((int) Constants.MAX_PARTITION_ID, blockIds1);
+ partitionToBlockIds.put(Constants.MAX_PARTITION_ID, blockIds1);
partitionToBlockIds.put(2, blockIds2);
partitionToBlockIds.put(3, blockIds3);
// bimapNum = 2
request = new RssReportShuffleResultRequest("shuffleResultTest", 4, 1L, partitionToBlockIds, 2);
shuffleServerClient.reportShuffleResult(request);
- req = new RssGetShuffleResultRequest("shuffleResultTest", 4, (int) Constants.MAX_PARTITION_ID);
+ req = new RssGetShuffleResultRequest("shuffleResultTest", 4, Constants.MAX_PARTITION_ID);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
expectedP1 = Roaring64NavigableMap.bitmapOf();
@@ -610,7 +610,7 @@
for (int i = 0; i < 100; i++) {
Map<Integer, List<Long>> ptbs = Maps.newHashMap();
List<Long> blockIds = Lists.newArrayList();
- Long blockId = ClientUtils.getBlockId(1, 0, i);
+ Long blockId = BlockId.getBlockId(i, 1, 0);
expectedBlockIds.add(blockId);
blockIds.add(blockId);
ptbs.put(1, blockIds);
@@ -624,7 +624,7 @@
for (int i = 100; i < 200; i++) {
Map<Integer, List<Long>> ptbs = Maps.newHashMap();
List<Long> blockIds = Lists.newArrayList();
- Long blockId = ClientUtils.getBlockId(1, 1, i);
+ Long blockId = BlockId.getBlockId(i, 1, 1);
expectedBlockIds.add(blockId);
blockIds.add(blockId);
ptbs.put(1, blockIds);
@@ -638,7 +638,7 @@
for (int i = 200; i < 300; i++) {
Map<Integer, List<Long>> ptbs = Maps.newHashMap();
List<Long> blockIds = Lists.newArrayList();
- Long blockId = ClientUtils.getBlockId(1, 2, i);
+ Long blockId = BlockId.getBlockId(i, 1, 2);
expectedBlockIds.add(blockId);
blockIds.add(blockId);
ptbs.put(1, blockIds);
@@ -984,7 +984,7 @@
private List<Long> getBlockIdList(int partitionId, int blockNum) {
List<Long> blockIds = Lists.newArrayList();
for (int i = 0; i < blockNum; i++) {
- blockIds.add(ClientUtils.getBlockId(partitionId, 0, atomicInteger.getAndIncrement()));
+ blockIds.add(BlockId.getBlockId(atomicInteger.getAndIncrement(), partitionId, 0));
}
return blockIds;
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 1442b21..d87bada 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -37,7 +37,6 @@
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.response.SendShuffleDataResult;
-import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
@@ -45,6 +44,7 @@
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -203,7 +203,7 @@
Set<Long> blockIds = Sets.newHashSet();
int partitionIdx = 1;
for (int i = 0; i < 5; i++) {
- blockIds.add(ClientUtils.getBlockId(partitionIdx, 0, i));
+ blockIds.add(BlockId.getBlockId(i, partitionIdx, 0));
}
partitionToBlocks.put(partitionIdx, blockIds);
serverToPartitionToBlockIds.put(shuffleServerInfo1, partitionToBlocks);
@@ -251,7 +251,7 @@
Map<Integer, Set<Long>> partitionToBlocks1 = Maps.newHashMap();
Set<Long> blockIds = Sets.newHashSet();
for (int i = 0; i < 5; i++) {
- blockIds.add(ClientUtils.getBlockId(1, 0, i));
+ blockIds.add(BlockId.getBlockId(i, 1, 0));
}
partitionToBlocks1.put(1, blockIds);
Map<ShuffleServerInfo, Map<Integer, Set<Long>>> serverToPartitionToBlockIds = Maps.newHashMap();
@@ -260,7 +260,7 @@
Map<Integer, Set<Long>> partitionToBlocks2 = Maps.newHashMap();
blockIds = Sets.newHashSet();
for (int i = 0; i < 7; i++) {
- blockIds.add(ClientUtils.getBlockId(2, 0, i));
+ blockIds.add(BlockId.getBlockId(i, 2, 0));
}
partitionToBlocks2.put(2, blockIds);
serverToPartitionToBlockIds.put(shuffleServerInfo2, partitionToBlocks2);
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index 2eb2ffa..65b699e 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -42,7 +42,7 @@
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
@@ -105,7 +105,7 @@
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
createTestData(testAppId, expectedData, blockIdBitmap, taskIdBitmap);
- blockIdBitmap.addLong((1 << Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+ blockIdBitmap.addLong(BlockId.getBlockId(0, 1, 0));
ShuffleReadClientImpl readClient;
readClient =
baseReadBuilder()
@@ -252,7 +252,9 @@
Roaring64NavigableMap wrongBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
LongIterator iter = blockIdBitmap.getLongIterator();
while (iter.hasNext()) {
- wrongBlockIdBitmap.addLong(iter.next() + (1 << Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+ BlockId blockId = BlockId.fromLong(iter.next());
+ wrongBlockIdBitmap.addLong(
+ BlockId.getBlockId(blockId.sequenceNo, blockId.partitionId + 1, blockId.taskAttemptId));
}
ShuffleReadClientImpl readClient =
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 7977b80..c40003c 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -62,6 +62,7 @@
import org.apache.uniffle.common.exception.NoRegisterException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
@@ -573,11 +574,9 @@
Set<Integer> requestPartitions,
Roaring64NavigableMap bitmap,
Roaring64NavigableMap resultBitmap) {
- final long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
bitmap.forEach(
blockId -> {
- int partitionId =
- Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
+ int partitionId = BlockId.getPartitionId(blockId);
if (requestPartitions.contains(partitionId)) {
resultBitmap.addLong(blockId);
}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index fb1de11..426ef4b 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -54,6 +54,7 @@
import org.apache.uniffle.common.exception.NoRegisterException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
@@ -714,7 +715,7 @@
for (int taskId = 1; taskId < 10; taskId++) {
for (int partitionId = 1; partitionId < 10; partitionId++) {
for (int i = 0; i < 2; i++) {
- long blockId = getBlockId(partitionId, taskId, i);
+ long blockId = BlockId.getBlockId(i, partitionId, taskId);
bitmapBlockIds.addLong(blockId);
if (partitionId == expectedPartitionId) {
expectedBlockIds.addLong(blockId);
@@ -727,19 +728,19 @@
Sets.newHashSet(expectedPartitionId), bitmapBlockIds, Roaring64NavigableMap.bitmapOf());
assertEquals(expectedBlockIds, resultBlockIds);
- bitmapBlockIds.addLong(getBlockId(0, 0, 0));
+ bitmapBlockIds.addLong(BlockId.getBlockId(0, 0, 0));
resultBlockIds =
shuffleTaskManager.getBlockIdsByPartitionId(
Sets.newHashSet(0), bitmapBlockIds, Roaring64NavigableMap.bitmapOf());
assertEquals(Roaring64NavigableMap.bitmapOf(0L), resultBlockIds);
long expectedBlockId =
- getBlockId(
- Constants.MAX_PARTITION_ID, Constants.MAX_TASK_ATTEMPT_ID, Constants.MAX_SEQUENCE_NO);
+ BlockId.getBlockId(
+ Constants.MAX_SEQUENCE_NO, Constants.MAX_PARTITION_ID, Constants.MAX_TASK_ATTEMPT_ID);
bitmapBlockIds.addLong(expectedBlockId);
resultBlockIds =
shuffleTaskManager.getBlockIdsByPartitionId(
- Sets.newHashSet(Math.toIntExact(Constants.MAX_PARTITION_ID)),
+ Sets.newHashSet(Constants.MAX_PARTITION_ID),
bitmapBlockIds,
Roaring64NavigableMap.bitmapOf());
assertEquals(Roaring64NavigableMap.bitmapOf(expectedBlockId), resultBlockIds);
@@ -757,7 +758,7 @@
for (int taskId = 1; taskId < 10; taskId++) {
for (int partitionId = 1; partitionId < 10; partitionId++) {
for (int i = 0; i < 2; i++) {
- long blockId = getBlockId(partitionId, taskId, i);
+ long blockId = BlockId.getBlockId(i, partitionId, taskId);
bitmapBlockIds.addLong(blockId);
if (partitionId >= startPartition && partitionId <= endPartition) {
expectedBlockIds.addLong(blockId);
@@ -830,7 +831,7 @@
long[] blockIds = new long[taskNum * blocksPerTask];
for (int taskId = 0; taskId < taskNum; taskId++) {
for (int i = 0; i < blocksPerTask; i++) {
- long blockId = getBlockId(partitionId, taskId, i);
+ long blockId = BlockId.getBlockId(i, partitionId, taskId);
blockIds[taskId * blocksPerTask + i] = blockId;
}
}
@@ -993,13 +994,6 @@
return appIdsOnDisk;
}
- // copy from ClientUtils
- private Long getBlockId(long partitionId, long taskAttemptId, long atomicInt) {
- return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
- + taskAttemptId;
- }
-
private void waitForFlush(
ShuffleFlushManager shuffleFlushManager, String appId, int shuffleId, int expectedBlockNum)
throws Exception {
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java b/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java
index d4e25c4..38f3abb 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java
@@ -19,6 +19,8 @@
import java.util.Objects;
+import org.apache.uniffle.common.util.BlockId;
+
public class FileBasedShuffleSegment extends ShuffleSegment
implements Comparable<FileBasedShuffleSegment> {
@@ -119,9 +121,9 @@
+ uncompressLength
+ "], crc["
+ crc
- + "], blockId["
- + blockId
- + "], taskAttemptId["
+ + "], "
+ + BlockId.toString(blockId)
+ + ", taskAttemptId["
+ taskAttemptId
+ "]}";
}
diff --git a/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java b/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
index b79d18f..7b30493 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
@@ -21,7 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
@@ -30,9 +30,9 @@
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import org.apache.uniffle.storage.handler.impl.HadoopFileReader;
import org.apache.uniffle.storage.handler.impl.HadoopFileWriter;
@@ -43,7 +43,7 @@
public class HadoopShuffleHandlerTestBase {
- private static final AtomicLong ATOMIC_LONG = new AtomicLong(0);
+ private static final AtomicInteger ATOMIC_INT = new AtomicInteger(0);
public static void writeTestData(
HadoopShuffleWriteHandler writeHandler,
@@ -56,10 +56,7 @@
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
- long blockId =
- (ATOMIC_LONG.getAndIncrement()
- << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + taskAttemptId;
+ long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0, taskAttemptId);
blocks.add(
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
@@ -84,10 +81,7 @@
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
- long blockId =
- (ATOMIC_LONG.getAndIncrement()
- << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + taskAttemptId;
+ long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0, taskAttemptId);
blocks.add(
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
index ed38338..8b307a3 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
@@ -36,8 +36,8 @@
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.HadoopShuffleHandlerTestBase;
import org.apache.uniffle.storage.HadoopTestBase;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
@@ -118,10 +118,7 @@
List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
byte[] buf = new byte[blockSize];
new Random().nextBytes(buf);
- long blockId =
- (expectTotalBlockNum
- << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + taskAttemptId;
+ long blockId = BlockId.getBlockId(expectTotalBlockNum, 0, taskAttemptId);
blocks.add(
new ShufflePartitionedBlock(
blockSize, blockSize, ChecksumUtils.getCrc32(buf), blockId, taskAttemptId, buf));
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
index e02387e..19a2905 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
@@ -22,7 +22,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -33,6 +33,7 @@
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.segment.FixedSizeSegmentSplitter;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
@@ -43,14 +44,14 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
public class LocalFileHandlerTestBase {
- private static AtomicLong ATOMIC_LONG = new AtomicLong(0L);
+ private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
public static List<ShufflePartitionedBlock> generateBlocks(int num, int length) {
List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
- long blockId = ATOMIC_LONG.incrementAndGet();
+ long blockId = BlockId.getBlockId(ATOMIC_INT.incrementAndGet(), 0, 100);
blocks.add(
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), blockId, 100, buf));