[#1373][FOLLOWUP] fix(spark): incorrect partition id type (#1582)
### What changes were proposed in this pull request?
fix partition id type is incorrect
### Why are the changes needed?
Fix: (#1373)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Not necessary.
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 07e1a18..5017442 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -898,16 +898,17 @@
ShuffleServerInfo newAssignedServer = assignShuffleServer(shuffleId, id);
ShuffleHandleInfo shuffleHandleInfo = shuffleIdToShuffleHandleInfo.get(shuffleId);
for (String partitionId : partitionIds) {
+ Integer partitionIdInteger = Integer.valueOf(partitionId);
List<ShuffleServerInfo> shuffleServerInfoList =
- shuffleHandleInfo.getPartitionToServers().get(partitionId);
+ shuffleHandleInfo.getPartitionToServers().get(partitionIdInteger);
for (int i = 0; i < shuffleServerInfoList.size(); i++) {
if (shuffleServerInfoList.get(i).getId().equals(faultyShuffleServerId)) {
shuffleHandleInfo
.getFailoverPartitionServers()
- .computeIfAbsent(Integer.valueOf(partitionId), k -> Maps.newHashMap());
+ .computeIfAbsent(partitionIdInteger, k -> Maps.newHashMap());
shuffleHandleInfo
.getFailoverPartitionServers()
- .get(partitionId)
+ .get(partitionIdInteger)
.computeIfAbsent(i, j -> Lists.newArrayList())
.add(newAssignedServer);
}
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 1f4a4cd..b360807 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -1190,16 +1190,17 @@
ShuffleServerInfo newAssignedServer = assignShuffleServer(shuffleId, id);
ShuffleHandleInfo shuffleHandleInfo = shuffleIdToShuffleHandleInfo.get(shuffleId);
for (String partitionId : partitionIds) {
+ Integer partitionIdInteger = Integer.valueOf(partitionId);
List<ShuffleServerInfo> shuffleServerInfoList =
- shuffleHandleInfo.getPartitionToServers().get(partitionId);
+ shuffleHandleInfo.getPartitionToServers().get(partitionIdInteger);
for (int i = 0; i < shuffleServerInfoList.size(); i++) {
if (shuffleServerInfoList.get(i).getId().equals(faultyShuffleServerId)) {
shuffleHandleInfo
.getFailoverPartitionServers()
- .computeIfAbsent(Integer.valueOf(partitionId), k -> Maps.newHashMap());
+ .computeIfAbsent(partitionIdInteger, k -> Maps.newHashMap());
shuffleHandleInfo
.getFailoverPartitionServers()
- .get(partitionId)
+ .get(partitionIdInteger)
.computeIfAbsent(i, j -> Lists.newArrayList())
.add(newAssignedServer);
}