[SPARK-32124][CORE][FOLLOW-UP] Use the invalid value Int.MinValue to fill the map index when the event logs from the old Spark version
### What changes were proposed in this pull request?
Use the invalid value Int.MinValue to fill the map index when the event logs from the old Spark version.
### Why are the changes needed?
Follow up PR for #28941.
### Does this PR introduce _any_ user-facing change?
When we use the Spark version 3.0 history server reading the event log written by the old Spark version, we use the invalid value -2 to fill the map index.
### How was this patch tested?
Existing UT.
Closes #28965 from xuanyuanking/follow-up.
Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index b13028f..6606d31 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -90,7 +90,8 @@
extends TaskFailedReason {
override def toErrorString: String = {
val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
- s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapIndex=$mapIndex, " +
+ val mapIndexString = if (mapIndex == Int.MinValue) "Unknown" else mapIndex.toString
+ s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapIndex=$mapIndexString, " +
s"mapId=$mapId, reduceId=$reduceId, message=\n$message\n)"
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index ced3f9d..f337250 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -1078,8 +1078,12 @@
val blockManagerAddress = blockManagerIdFromJson(json \ "Block Manager Address")
val shuffleId = (json \ "Shuffle ID").extract[Int]
val mapId = (json \ "Map ID").extract[Long]
- val mapIndex = (json \ "Map Index") match {
- case JNothing => 0
+ val mapIndex = json \ "Map Index" match {
+ case JNothing =>
+ // Note, we use the invalid value Int.MinValue here to fill the map index for backward
+ // compatibility. Otherwise, the fetch failed event will be dropped when the history
+ // server loads the event log written by the Spark version before 3.0.
+ Int.MinValue
case x => x.extract[Int]
}
val reduceId = (json \ "Reduce ID").extract[Int]
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 955589f..c75e98f 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -334,7 +334,7 @@
val oldEvent = JsonProtocol.taskEndReasonToJson(fetchFailed)
.removeField({ _._1 == "Map Index" })
val expectedFetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 16L,
- 0, 19, "ignored")
+ Int.MinValue, 19, "ignored")
assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent))
}