[SPARK-29434][CORE] Improve the MapStatuses Serialization Performance

### What changes were proposed in this pull request?
Instead of using GZIP for compressing the serialized `MapStatuses`, ZStd provides better compression rate and faster compression time.

The original approach is serializing and writing data directly into `GZIPOutputStream` as one step; however, the compression time is faster if a bigger chuck of the data is processed by the codec at once. As a result, in this PR, the serialized data is written into an uncompressed byte array first, and then the data is compressed. For smaller `MapStatues`, we find it's 2x faster.

Here is the benchmark result.

#### 20k map outputs, and each has 500 blocks
1. ZStd two steps in this PR: 0.402 ops/ms, 89,066 bytes
2. ZStd one step as the original approach: 0.370 ops/ms, 89,069 bytes
3. GZip: 0.092 ops/ms, 217,345 bytes

#### 20k map outputs, and each has 5 blocks
1. ZStd two steps in this PR: 0.9 ops/ms, 75,449 bytes
2. ZStd one step as the original approach: 0.38 ops/ms, 75,452 bytes
3. GZip: 0.21 ops/ms, 160,094 bytes

### Why are the changes needed?
Decrease the time for serializing the `MapStatuses` in large scale job.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #26085 from dbtsai/mapStatus.

Lead-authored-by: DB Tsai <d_tsai@apple.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
index 747aae0..7a6cfb7 100644
--- a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
+++ b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
@@ -2,21 +2,21 @@
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 200000 MapOutputs, 10 blocks w/ broadcast:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-Serialization                                       609            631          22          0.3        3043.8       1.0X
-Deserialization                                     840            897          67          0.2        4201.2       0.7X
+Serialization                                       205            213          13          1.0        1023.6       1.0X
+Deserialization                                     908            939          27          0.2        4540.2       0.2X
 
-Compressed Serialized MapStatus sizes: 393 bytes
-Compressed Serialized Broadcast MapStatus sizes: 3 MB
+Compressed Serialized MapStatus sizes: 400 bytes
+Compressed Serialized Broadcast MapStatus sizes: 2 MB
 
 
 OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 200000 MapOutputs, 10 blocks w/o broadcast:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-Serialization                                       591            599           8          0.3        2955.3       1.0X
-Deserialization                                     878            913          31          0.2        4392.2       0.7X
+Serialization                                       195            204          24          1.0         976.9       1.0X
+Deserialization                                     913            940          33          0.2        4566.7       0.2X
 
-Compressed Serialized MapStatus sizes: 3 MB
+Compressed Serialized MapStatus sizes: 2 MB
 Compressed Serialized Broadcast MapStatus sizes: 0 bytes
 
 
@@ -24,21 +24,21 @@
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 200000 MapOutputs, 100 blocks w/ broadcast:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-Serialization                                      1776           1778           2          0.1        8880.5       1.0X
-Deserialization                                    1086           1086           0          0.2        5427.9       1.6X
+Serialization                                       616            619           3          0.3        3079.1       1.0X
+Deserialization                                     936            954          22          0.2        4680.5       0.7X
 
-Compressed Serialized MapStatus sizes: 411 bytes
-Compressed Serialized Broadcast MapStatus sizes: 15 MB
+Compressed Serialized MapStatus sizes: 418 bytes
+Compressed Serialized Broadcast MapStatus sizes: 14 MB
 
 
 OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 200000 MapOutputs, 100 blocks w/o broadcast:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-Serialization                                      1725           1726           1          0.1        8624.9       1.0X
-Deserialization                                    1093           1094           2          0.2        5463.6       1.6X
+Serialization                                       586            588           3          0.3        2928.8       1.0X
+Deserialization                                     929            933           4          0.2        4647.0       0.6X
 
-Compressed Serialized MapStatus sizes: 15 MB
+Compressed Serialized MapStatus sizes: 14 MB
 Compressed Serialized Broadcast MapStatus sizes: 0 bytes
 
 
@@ -46,21 +46,21 @@
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 200000 MapOutputs, 1000 blocks w/ broadcast:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-Serialization                                     12421          12522         142          0.0       62104.4       1.0X
-Deserialization                                    3020           3043          32          0.1       15102.0       4.1X
+Serialization                                      4740           4916         249          0.0       23698.5       1.0X
+Deserialization                                    1578           1597          27          0.1        7890.6       3.0X
 
-Compressed Serialized MapStatus sizes: 544 bytes
-Compressed Serialized Broadcast MapStatus sizes: 131 MB
+Compressed Serialized MapStatus sizes: 546 bytes
+Compressed Serialized Broadcast MapStatus sizes: 123 MB
 
 
 OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 200000 MapOutputs, 1000 blocks w/o broadcast:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-Serialization                                     11719          11737          26          0.0       58595.3       1.0X
-Deserialization                                    3018           3051          46          0.1       15091.7       3.9X
+Serialization                                      4492           4573         115          0.0       22458.3       1.0X
+Deserialization                                    1533           1547          20          0.1        7664.8       2.9X
 
-Compressed Serialized MapStatus sizes: 131 MB
+Compressed Serialized MapStatus sizes: 123 MB
 Compressed Serialized Broadcast MapStatus sizes: 0 bytes
 
 
diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
index 1f479a4..0c64969 100644
--- a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
+++ b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
@@ -2,21 +2,21 @@
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 200000 MapOutputs, 10 blocks w/ broadcast:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-Serialization                                       625            639           9          0.3        3127.2       1.0X
-Deserialization                                     875            931          49          0.2        4376.2       0.7X
+Serialization                                       236            245          18          0.8        1179.1       1.0X
+Deserialization                                     842            885          37          0.2        4211.4       0.3X
 
-Compressed Serialized MapStatus sizes: 393 bytes
-Compressed Serialized Broadcast MapStatus sizes: 3 MB
+Compressed Serialized MapStatus sizes: 400 bytes
+Compressed Serialized Broadcast MapStatus sizes: 2 MB
 
 
 OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 200000 MapOutputs, 10 blocks w/o broadcast:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-Serialization                                       604            640          71          0.3        3018.4       1.0X
-Deserialization                                     889            903          17          0.2        4443.8       0.7X
+Serialization                                       213            219           8          0.9        1065.1       1.0X
+Deserialization                                     846            870          33          0.2        4228.6       0.3X
 
-Compressed Serialized MapStatus sizes: 3 MB
+Compressed Serialized MapStatus sizes: 2 MB
 Compressed Serialized Broadcast MapStatus sizes: 0 bytes
 
 
@@ -24,21 +24,21 @@
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 200000 MapOutputs, 100 blocks w/ broadcast:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-Serialization                                      1879           1880           2          0.1        9394.9       1.0X
-Deserialization                                    1147           1150           5          0.2        5733.8       1.6X
+Serialization                                       624            709         167          0.3        3121.1       1.0X
+Deserialization                                     885            908          22          0.2        4427.0       0.7X
 
-Compressed Serialized MapStatus sizes: 411 bytes
-Compressed Serialized Broadcast MapStatus sizes: 15 MB
+Compressed Serialized MapStatus sizes: 418 bytes
+Compressed Serialized Broadcast MapStatus sizes: 14 MB
 
 
 OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 200000 MapOutputs, 100 blocks w/o broadcast:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-Serialization                                      1825           1826           1          0.1        9123.3       1.0X
-Deserialization                                    1147           1281         189          0.2        5735.7       1.6X
+Serialization                                       603            604           2          0.3        3014.9       1.0X
+Deserialization                                     892            895           5          0.2        4458.7       0.7X
 
-Compressed Serialized MapStatus sizes: 15 MB
+Compressed Serialized MapStatus sizes: 14 MB
 Compressed Serialized Broadcast MapStatus sizes: 0 bytes
 
 
@@ -46,21 +46,21 @@
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 200000 MapOutputs, 1000 blocks w/ broadcast:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-Serialization                                     12327          12518         270          0.0       61634.3       1.0X
-Deserialization                                    3120           3133          18          0.1       15600.8       4.0X
+Serialization                                      4612           4945         471          0.0       23061.0       1.0X
+Deserialization                                    1493           1495           2          0.1        7466.3       3.1X
 
-Compressed Serialized MapStatus sizes: 544 bytes
-Compressed Serialized Broadcast MapStatus sizes: 131 MB
+Compressed Serialized MapStatus sizes: 546 bytes
+Compressed Serialized Broadcast MapStatus sizes: 123 MB
 
 
 OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 200000 MapOutputs, 1000 blocks w/o broadcast:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ------------------------------------------------------------------------------------------------------------------------
-Serialization                                     11928          11986          82          0.0       59642.2       1.0X
-Deserialization                                    3137           3138           2          0.1       15683.3       3.8X
+Serialization                                      4452           4595         202          0.0       22261.4       1.0X
+Deserialization                                    1464           1477          18          0.1        7321.4       3.0X
 
-Compressed Serialized MapStatus sizes: 131 MB
+Compressed Serialized MapStatus sizes: 123 MB
 Compressed Serialized Broadcast MapStatus sizes: 0 bytes
 
 
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index c181fac..6f4a623 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -17,10 +17,9 @@
 
 package org.apache.spark
 
-import java.io._
+import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
 import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
 import java.util.concurrent.locks.ReentrantReadWriteLock
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, ListBuffer, Map}
@@ -29,6 +28,10 @@
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
+import com.github.luben.zstd.ZstdInputStream
+import com.github.luben.zstd.ZstdOutputStream
+import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOutputStream}
+
 import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
@@ -885,13 +888,18 @@
   private val BROADCAST = 1
 
   // Serialize an array of map output locations into an efficient byte format so that we can send
-  // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
+  // it to reduce tasks. We do this by compressing the serialized bytes using Zstd. They will
   // generally be pretty compressible because many map outputs will be on the same hostname.
   def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager,
       isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = {
-    val out = new ByteArrayOutputStream
-    out.write(DIRECT)
-    val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
+    // Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one
+    // This implementation doesn't reallocate the whole memory block but allocates
+    // additional buffers. This way no buffers need to be garbage collected and
+    // the contents don't have to be copied to the new buffer.
+    val out = new ApacheByteArrayOutputStream()
+    val compressedOut = new ApacheByteArrayOutputStream()
+
+    val objOut = new ObjectOutputStream(out)
     Utils.tryWithSafeFinally {
       // Since statuses can be modified in parallel, sync on it
       statuses.synchronized {
@@ -900,18 +908,42 @@
     } {
       objOut.close()
     }
-    val arr = out.toByteArray
+
+    val arr: Array[Byte] = {
+      val zos = new ZstdOutputStream(compressedOut)
+      Utils.tryWithSafeFinally {
+        compressedOut.write(DIRECT)
+        // `out.writeTo(zos)` will write the uncompressed data from `out` to `zos`
+        //  without copying to avoid unnecessary allocation and copy of byte[].
+        out.writeTo(zos)
+      } {
+        zos.close()
+      }
+      compressedOut.toByteArray
+    }
     if (arr.length >= minBroadcastSize) {
       // Use broadcast instead.
       // Important arr(0) is the tag == DIRECT, ignore that while deserializing !
       val bcast = broadcastManager.newBroadcast(arr, isLocal)
       // toByteArray creates copy, so we can reuse out
       out.reset()
-      out.write(BROADCAST)
-      val oos = new ObjectOutputStream(new GZIPOutputStream(out))
-      oos.writeObject(bcast)
-      oos.close()
-      val outArr = out.toByteArray
+      val oos = new ObjectOutputStream(out)
+      Utils.tryWithSafeFinally {
+        oos.writeObject(bcast)
+      } {
+        oos.close()
+      }
+      val outArr = {
+        compressedOut.reset()
+        val zos = new ZstdOutputStream(compressedOut)
+        Utils.tryWithSafeFinally {
+          compressedOut.write(BROADCAST)
+          out.writeTo(zos)
+        } {
+          zos.close()
+        }
+        compressedOut.toByteArray
+      }
       logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length)
       (outArr, bcast)
     } else {
@@ -924,7 +956,7 @@
     assert (bytes.length > 0)
 
     def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = {
-      val objIn = new ObjectInputStream(new GZIPInputStream(
+      val objIn = new ObjectInputStream(new ZstdInputStream(
         new ByteArrayInputStream(arr, off, len)))
       Utils.tryWithSafeFinally {
         objIn.readObject()