[SPARK-48208][SS] Skip providing memory usage metrics from RocksDB if bounded memory usage is enabled
### What changes were proposed in this pull request?
Skip providing memory usage metrics from RocksDB if bounded memory usage is enabled
### Why are the changes needed?
Without this, we are providing memory usage that is the max usage per node at a partition level.
For eg - if we report this
```
"allRemovalsTimeMs" : 93,
"commitTimeMs" : 32240,
"memoryUsedBytes" : 15956211724278,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 200,
"numStateStoreInstances" : 200,
```
We have 200 partitions in this case.
So the memory usage per partition / state store would be ~78GB. However, this node has 256GB memory total and we have 2 such nodes. We have configured our cluster to use 30% of available memory on each node for RocksDB which is ~77GB.
So the memory being reported here is actually per node rather than per partition which could be confusing for users.
### Does this PR introduce _any_ user-facing change?
No - only a metrics reporting change
### How was this patch tested?
Added unit tests
```
[info] Run completed in 10 seconds, 878 milliseconds.
[info] Total number of tests run: 24
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 24, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #46491 from anishshri-db/task/SPARK-48208.
Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index caecf81..1516951 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -777,10 +777,19 @@
.keys.filter(checkInternalColumnFamilies(_)).size
val numExternalColFamilies = colFamilyNameToHandleMap.keys.size - numInternalColFamilies
+ // if bounded memory usage is enabled, we share the block cache across all state providers
+ // running on the same node and account the usage to this single cache. In this case, its not
+ // possible to provide partition level or query level memory usage.
+ val memoryUsage = if (conf.boundedMemoryUsage) {
+ 0L
+ } else {
+ readerMemUsage + memTableMemUsage + blockCacheUsage
+ }
+
RocksDBMetrics(
numKeysOnLoadedVersion,
numKeysOnWritingVersion,
- readerMemUsage + memTableMemUsage + blockCacheUsage,
+ memoryUsage,
pinnedBlocksMemUsage,
totalSSTFilesBytes,
nativeOpsLatencyMicros,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index ab2afa1..6086fd4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -1699,6 +1699,11 @@
db.load(0)
db.put("a", "1")
db.commit()
+ if (boundedMemoryUsage == "true") {
+ assert(db.metricsOpt.get.totalMemUsageBytes === 0)
+ } else {
+ assert(db.metricsOpt.get.totalMemUsageBytes > 0)
+ }
db.getWriteBufferManagerAndCache()
}
@@ -1709,6 +1714,11 @@
db.load(0)
db.put("a", "1")
db.commit()
+ if (boundedMemoryUsage == "true") {
+ assert(db.metricsOpt.get.totalMemUsageBytes === 0)
+ } else {
+ assert(db.metricsOpt.get.totalMemUsageBytes > 0)
+ }
db.getWriteBufferManagerAndCache()
}
@@ -1758,6 +1768,7 @@
db.remove("a")
db.put("c", "3")
db.commit()
+ assert(db.metricsOpt.get.totalMemUsageBytes === 0)
}
} finally {
RocksDBMemoryManager.resetWriteBufferManagerAndCache