[SPARK-29434][CORE] Improve the MapStatuses Serialization Performance
### What changes were proposed in this pull request?
Instead of using GZIP for compressing the serialized `MapStatuses`, ZStd provides better compression rate and faster compression time.
The original approach is serializing and writing data directly into `GZIPOutputStream` as one step; however, the compression time is faster if a bigger chuck of the data is processed by the codec at once. As a result, in this PR, the serialized data is written into an uncompressed byte array first, and then the data is compressed. For smaller `MapStatues`, we find it's 2x faster.
Here is the benchmark result.
#### 20k map outputs, and each has 500 blocks
1. ZStd two steps in this PR: 0.402 ops/ms, 89,066 bytes
2. ZStd one step as the original approach: 0.370 ops/ms, 89,069 bytes
3. GZip: 0.092 ops/ms, 217,345 bytes
#### 20k map outputs, and each has 5 blocks
1. ZStd two steps in this PR: 0.9 ops/ms, 75,449 bytes
2. ZStd one step as the original approach: 0.38 ops/ms, 75,452 bytes
3. GZip: 0.21 ops/ms, 160,094 bytes
### Why are the changes needed?
Decrease the time for serializing the `MapStatuses` in large scale job.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes #26085 from dbtsai/mapStatus.
Lead-authored-by: DB Tsai <d_tsai@apple.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
index 747aae0..7a6cfb7 100644
--- a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
+++ b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
@@ -2,21 +2,21 @@
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Serialization 609 631 22 0.3 3043.8 1.0X
-Deserialization 840 897 67 0.2 4201.2 0.7X
+Serialization 205 213 13 1.0 1023.6 1.0X
+Deserialization 908 939 27 0.2 4540.2 0.2X
-Compressed Serialized MapStatus sizes: 393 bytes
-Compressed Serialized Broadcast MapStatus sizes: 3 MB
+Compressed Serialized MapStatus sizes: 400 bytes
+Compressed Serialized Broadcast MapStatus sizes: 2 MB
OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Serialization 591 599 8 0.3 2955.3 1.0X
-Deserialization 878 913 31 0.2 4392.2 0.7X
+Serialization 195 204 24 1.0 976.9 1.0X
+Deserialization 913 940 33 0.2 4566.7 0.2X
-Compressed Serialized MapStatus sizes: 3 MB
+Compressed Serialized MapStatus sizes: 2 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
@@ -24,21 +24,21 @@
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Serialization 1776 1778 2 0.1 8880.5 1.0X
-Deserialization 1086 1086 0 0.2 5427.9 1.6X
+Serialization 616 619 3 0.3 3079.1 1.0X
+Deserialization 936 954 22 0.2 4680.5 0.7X
-Compressed Serialized MapStatus sizes: 411 bytes
-Compressed Serialized Broadcast MapStatus sizes: 15 MB
+Compressed Serialized MapStatus sizes: 418 bytes
+Compressed Serialized Broadcast MapStatus sizes: 14 MB
OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Serialization 1725 1726 1 0.1 8624.9 1.0X
-Deserialization 1093 1094 2 0.2 5463.6 1.6X
+Serialization 586 588 3 0.3 2928.8 1.0X
+Deserialization 929 933 4 0.2 4647.0 0.6X
-Compressed Serialized MapStatus sizes: 15 MB
+Compressed Serialized MapStatus sizes: 14 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
@@ -46,21 +46,21 @@
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Serialization 12421 12522 142 0.0 62104.4 1.0X
-Deserialization 3020 3043 32 0.1 15102.0 4.1X
+Serialization 4740 4916 249 0.0 23698.5 1.0X
+Deserialization 1578 1597 27 0.1 7890.6 3.0X
-Compressed Serialized MapStatus sizes: 544 bytes
-Compressed Serialized Broadcast MapStatus sizes: 131 MB
+Compressed Serialized MapStatus sizes: 546 bytes
+Compressed Serialized Broadcast MapStatus sizes: 123 MB
OpenJDK 64-Bit Server VM 11.0.4+11-post-Ubuntu-1ubuntu218.04.3 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Serialization 11719 11737 26 0.0 58595.3 1.0X
-Deserialization 3018 3051 46 0.1 15091.7 3.9X
+Serialization 4492 4573 115 0.0 22458.3 1.0X
+Deserialization 1533 1547 20 0.1 7664.8 2.9X
-Compressed Serialized MapStatus sizes: 131 MB
+Compressed Serialized MapStatus sizes: 123 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
index 1f479a4..0c64969 100644
--- a/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
+++ b/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
@@ -2,21 +2,21 @@
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Serialization 625 639 9 0.3 3127.2 1.0X
-Deserialization 875 931 49 0.2 4376.2 0.7X
+Serialization 236 245 18 0.8 1179.1 1.0X
+Deserialization 842 885 37 0.2 4211.4 0.3X
-Compressed Serialized MapStatus sizes: 393 bytes
-Compressed Serialized Broadcast MapStatus sizes: 3 MB
+Compressed Serialized MapStatus sizes: 400 bytes
+Compressed Serialized Broadcast MapStatus sizes: 2 MB
OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Serialization 604 640 71 0.3 3018.4 1.0X
-Deserialization 889 903 17 0.2 4443.8 0.7X
+Serialization 213 219 8 0.9 1065.1 1.0X
+Deserialization 846 870 33 0.2 4228.6 0.3X
-Compressed Serialized MapStatus sizes: 3 MB
+Compressed Serialized MapStatus sizes: 2 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
@@ -24,21 +24,21 @@
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Serialization 1879 1880 2 0.1 9394.9 1.0X
-Deserialization 1147 1150 5 0.2 5733.8 1.6X
+Serialization 624 709 167 0.3 3121.1 1.0X
+Deserialization 885 908 22 0.2 4427.0 0.7X
-Compressed Serialized MapStatus sizes: 411 bytes
-Compressed Serialized Broadcast MapStatus sizes: 15 MB
+Compressed Serialized MapStatus sizes: 418 bytes
+Compressed Serialized Broadcast MapStatus sizes: 14 MB
OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Serialization 1825 1826 1 0.1 9123.3 1.0X
-Deserialization 1147 1281 189 0.2 5735.7 1.6X
+Serialization 603 604 2 0.3 3014.9 1.0X
+Deserialization 892 895 5 0.2 4458.7 0.7X
-Compressed Serialized MapStatus sizes: 15 MB
+Compressed Serialized MapStatus sizes: 14 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
@@ -46,21 +46,21 @@
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Serialization 12327 12518 270 0.0 61634.3 1.0X
-Deserialization 3120 3133 18 0.1 15600.8 4.0X
+Serialization 4612 4945 471 0.0 23061.0 1.0X
+Deserialization 1493 1495 2 0.1 7466.3 3.1X
-Compressed Serialized MapStatus sizes: 544 bytes
-Compressed Serialized Broadcast MapStatus sizes: 131 MB
+Compressed Serialized MapStatus sizes: 546 bytes
+Compressed Serialized Broadcast MapStatus sizes: 123 MB
OpenJDK 64-Bit Server VM 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Serialization 11928 11986 82 0.0 59642.2 1.0X
-Deserialization 3137 3138 2 0.1 15683.3 3.8X
+Serialization 4452 4595 202 0.0 22261.4 1.0X
+Deserialization 1464 1477 18 0.1 7321.4 3.0X
-Compressed Serialized MapStatus sizes: 131 MB
+Compressed Serialized MapStatus sizes: 123 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index c181fac..6f4a623 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -17,10 +17,9 @@
package org.apache.spark
-import java.io._
+import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer, Map}
@@ -29,6 +28,10 @@
import scala.reflect.ClassTag
import scala.util.control.NonFatal
+import com.github.luben.zstd.ZstdInputStream
+import com.github.luben.zstd.ZstdOutputStream
+import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOutputStream}
+
import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
@@ -885,13 +888,18 @@
private val BROADCAST = 1
// Serialize an array of map output locations into an efficient byte format so that we can send
- // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
+ // it to reduce tasks. We do this by compressing the serialized bytes using Zstd. They will
// generally be pretty compressible because many map outputs will be on the same hostname.
def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager,
isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = {
- val out = new ByteArrayOutputStream
- out.write(DIRECT)
- val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
+ // Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one
+ // This implementation doesn't reallocate the whole memory block but allocates
+ // additional buffers. This way no buffers need to be garbage collected and
+ // the contents don't have to be copied to the new buffer.
+ val out = new ApacheByteArrayOutputStream()
+ val compressedOut = new ApacheByteArrayOutputStream()
+
+ val objOut = new ObjectOutputStream(out)
Utils.tryWithSafeFinally {
// Since statuses can be modified in parallel, sync on it
statuses.synchronized {
@@ -900,18 +908,42 @@
} {
objOut.close()
}
- val arr = out.toByteArray
+
+ val arr: Array[Byte] = {
+ val zos = new ZstdOutputStream(compressedOut)
+ Utils.tryWithSafeFinally {
+ compressedOut.write(DIRECT)
+ // `out.writeTo(zos)` will write the uncompressed data from `out` to `zos`
+ // without copying to avoid unnecessary allocation and copy of byte[].
+ out.writeTo(zos)
+ } {
+ zos.close()
+ }
+ compressedOut.toByteArray
+ }
if (arr.length >= minBroadcastSize) {
// Use broadcast instead.
// Important arr(0) is the tag == DIRECT, ignore that while deserializing !
val bcast = broadcastManager.newBroadcast(arr, isLocal)
// toByteArray creates copy, so we can reuse out
out.reset()
- out.write(BROADCAST)
- val oos = new ObjectOutputStream(new GZIPOutputStream(out))
- oos.writeObject(bcast)
- oos.close()
- val outArr = out.toByteArray
+ val oos = new ObjectOutputStream(out)
+ Utils.tryWithSafeFinally {
+ oos.writeObject(bcast)
+ } {
+ oos.close()
+ }
+ val outArr = {
+ compressedOut.reset()
+ val zos = new ZstdOutputStream(compressedOut)
+ Utils.tryWithSafeFinally {
+ compressedOut.write(BROADCAST)
+ out.writeTo(zos)
+ } {
+ zos.close()
+ }
+ compressedOut.toByteArray
+ }
logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length)
(outArr, bcast)
} else {
@@ -924,7 +956,7 @@
assert (bytes.length > 0)
def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = {
- val objIn = new ObjectInputStream(new GZIPInputStream(
+ val objIn = new ObjectInputStream(new ZstdInputStream(
new ByteArrayInputStream(arr, off, len)))
Utils.tryWithSafeFinally {
objIn.readObject()